From 701c9bf262cef3f1ae09b9031c3714eb75e804c2 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 21 Oct 2024 09:22:32 +0200 Subject: [PATCH 01/24] added function for tracing dax queries --- src/sempy_labs/_dax.py | 62 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 798194cd..ef85ea6b 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -1,11 +1,14 @@ import sempy.fabric as fabric import pandas as pd +from typing import Optional +from sempy._utils._log import log +import time +import sempy_labs._icons as icons from sempy_labs._helper_functions import ( resolve_dataset_id, resolve_workspace_name_and_id, ) -from typing import Optional -from sempy._utils._log import log +from sempy_labs._clear_cache import clear_cache @log @@ -62,3 +65,58 @@ def evaluate_dax_impersonation( df = pd.DataFrame(data_rows, columns=column_names) return df + + +def analyze_dax( + dataset: str, + dax_queries: dict, + workspace: Optional[str] = None, + rest_time: int = 2, + clear_cache_before_run: bool = False, + clear_cache_before_each_query: bool = False, +): + + if workspace is None: + workspace = fabric.resolve_workspace_name() + + base_cols = ["EventClass", "EventSubclass", "CurrentTime", "NTUserName", "TextData"] + begin_cols = base_cols + ["StartTime"] + end_cols = base_cols + ["StartTime", "EndTime", "Duration", "CpuTime", "Success"] + + event_schema = { + "VertiPaqSEQueryBegin": begin_cols, + "VertiPaqSEQueryEnd": end_cols, + "VertiPaqSEQueryCacheMatch": base_cols, + "QueryBegin": begin_cols + ["ApplicationName"], + "QueryEnd": end_cols + ["ApplicationName"], + } + + if clear_cache_before_run: + clear_cache(dataset=dataset, workspace=workspace) + + # Establish trace connection + with fabric.create_trace_connection(dataset=dataset, workspace=workspace) as trace_connection: + with trace_connection.create_trace(event_schema) as trace: + trace.start() + # Loop through DAX queries + for i, (name, dax) in enumerate(dax_queries.items()): + # Clear cache for each query but not if done already before the run began + if clear_cache_before_each_query and not (i == 0 and clear_cache_before_run): + clear_cache(dataset=dataset, workspace=workspace) + + fabric.evaluate_dax(dataset=dataset, workspace=workspace, dax_string=dax) + time.sleep(rest_time) + print(f"{icons.green_dot} The '{name}' query has completed.") + + df = trace.stop() + time.sleep(5) + + # Name queries per dictionary + query_names = list(dax_queries.keys()) + query_begin = df['Event Class'] == 'QueryBegin' + df['Query Name'] = (query_begin).cumsum() + df['Query Name'] = df['Query Name'].where(query_begin, None).ffill() + df['Query Name'] = pd.to_numeric(df['Query Name'], downcast='integer') + df['Query Name'] = df['Query Name'].map(lambda x: query_names[x - 1]) + + return df From ec5645aa0c10c25db383fe76dd023f006b2141ea Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 30 Oct 2024 11:59:23 +0100 Subject: [PATCH 02/24] updated trace_dax --- src/sempy_labs/__init__.py | 6 +++- src/sempy_labs/_dax.py | 56 +++++++++++++++++++++++++++++++------- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index 71866616..c56511a8 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -135,7 +135,10 @@ # create_connection_vnet, # create_connection_on_prem ) -from sempy_labs._dax import evaluate_dax_impersonation +from sempy_labs._dax import ( + evaluate_dax_impersonation, + trace_dax, +) from sempy_labs._generate_semantic_model import ( create_blank_semantic_model, create_semantic_model_from_bim, @@ -373,4 +376,5 @@ "revoke_external_data_share", "migrate_fabric_trial_capacity", "create_resource_group", + "trace_dax", ] diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index ef85ea6b..54790165 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -67,14 +67,43 @@ def evaluate_dax_impersonation( return df -def analyze_dax( +def trace_dax( dataset: str, dax_queries: dict, - workspace: Optional[str] = None, rest_time: int = 2, clear_cache_before_run: bool = False, clear_cache_before_each_query: bool = False, + workspace: Optional[str] = None, ): + """ + Runs a SQL Profiler trace over a set of DAX queries. + + Parameters + ---------- + dataset : str + Name of the semantic model. + dax_queries : dict + The dax queries to run in a dictionary format. Here is an example: + { + "Sales Amount Test", """ """ EVALUATE SUMMARIZECOLUMNS("Sales Amount", [Sales Amount]) """ """, + "Order Quantity with Product", """ """ EVALUATE SUMMARIZECOLUMNS('Product'[Color], "Order Qty", [Order Qty]) """ """, + } + rest_time : int, default=2 + Rest time (in seconds) between the execution of each DAX query. + clear_cache_before_run : bool, default=False + If True, clears the cache before running any DAX queries. + clear_cache_before_each_query : bool, default=False + If True, clears the cache before running each DAX query. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing the SQL profiler trace results of the DAX queries. + """ if workspace is None: workspace = fabric.resolve_workspace_name() @@ -95,28 +124,35 @@ def analyze_dax( clear_cache(dataset=dataset, workspace=workspace) # Establish trace connection - with fabric.create_trace_connection(dataset=dataset, workspace=workspace) as trace_connection: + with fabric.create_trace_connection( + dataset=dataset, workspace=workspace + ) as trace_connection: with trace_connection.create_trace(event_schema) as trace: trace.start() # Loop through DAX queries for i, (name, dax) in enumerate(dax_queries.items()): # Clear cache for each query but not if done already before the run began - if clear_cache_before_each_query and not (i == 0 and clear_cache_before_run): + if clear_cache_before_each_query and not ( + i == 0 and clear_cache_before_run + ): clear_cache(dataset=dataset, workspace=workspace) - fabric.evaluate_dax(dataset=dataset, workspace=workspace, dax_string=dax) + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) time.sleep(rest_time) print(f"{icons.green_dot} The '{name}' query has completed.") df = trace.stop() + # Allow time to collect trace results time.sleep(5) # Name queries per dictionary query_names = list(dax_queries.keys()) - query_begin = df['Event Class'] == 'QueryBegin' - df['Query Name'] = (query_begin).cumsum() - df['Query Name'] = df['Query Name'].where(query_begin, None).ffill() - df['Query Name'] = pd.to_numeric(df['Query Name'], downcast='integer') - df['Query Name'] = df['Query Name'].map(lambda x: query_names[x - 1]) + query_begin = df["Event Class"] == "QueryBegin" + df["Query Name"] = (query_begin).cumsum() + df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) return df From 19772236975dc2b947e406fb466aa26af393d694 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 3 Nov 2024 14:52:39 +0200 Subject: [PATCH 03/24] output tuple --- src/sempy_labs/_dax.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 54790165..f6b07669 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -1,6 +1,6 @@ import sempy.fabric as fabric import pandas as pd -from typing import Optional +from typing import Optional, Tuple from sempy._utils._log import log import time import sempy_labs._icons as icons @@ -74,7 +74,7 @@ def trace_dax( clear_cache_before_run: bool = False, clear_cache_before_each_query: bool = False, workspace: Optional[str] = None, -): +) -> Tuple[pd.DataFrame, dict]: """ Runs a SQL Profiler trace over a set of DAX queries. @@ -101,8 +101,9 @@ def trace_dax( Returns ------- - pandas.DataFrame + Tuple[pandas.DataFrame, dict] A pandas dataframe showing the SQL profiler trace results of the DAX queries. + A dictionary of the query results in pandas dataframes. """ if workspace is None: @@ -120,6 +121,8 @@ def trace_dax( "QueryEnd": end_cols + ["ApplicationName"], } + query_results = {} + if clear_cache_before_run: clear_cache(dataset=dataset, workspace=workspace) @@ -137,9 +140,12 @@ def trace_dax( ): clear_cache(dataset=dataset, workspace=workspace) - fabric.evaluate_dax( + result = fabric.evaluate_dax( dataset=dataset, workspace=workspace, dax_string=dax ) + # Add results to output + query_results[name] = result + time.sleep(rest_time) print(f"{icons.green_dot} The '{name}' query has completed.") @@ -155,4 +161,4 @@ def trace_dax( df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) - return df + return df, query_results From 24aab5f3167d2a28076e0185e6885ec2dd7f37ce Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 4 Nov 2024 10:01:17 +0200 Subject: [PATCH 04/24] added new parameters to trace_dax --- src/sempy_labs/_dax.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index f6b07669..fb1fc0aa 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -73,6 +73,8 @@ def trace_dax( rest_time: int = 2, clear_cache_before_run: bool = False, clear_cache_before_each_query: bool = False, + trace_vertipaq_se: bool = False, + trace_direct_query: bool = False, workspace: Optional[str] = None, ) -> Tuple[pd.DataFrame, dict]: """ @@ -94,6 +96,10 @@ def trace_dax( If True, clears the cache before running any DAX queries. clear_cache_before_each_query : bool, default=False If True, clears the cache before running each DAX query. + trace_vertipaq_se : bool, default=False + If True, adds the following events to the trace: VertiPaq SE Query Begin, VertiPaq SE Query End, VertiPaq SE Query Cache Match + trace_direct_query : bool, default=False + If True, adds the following events to the trace: Direct Query Begin, Direct Query End workspace : str, default=None The Fabric workspace name. Defaults to None which resolves to the workspace of the attached lakehouse @@ -112,15 +118,21 @@ def trace_dax( base_cols = ["EventClass", "EventSubclass", "CurrentTime", "NTUserName", "TextData"] begin_cols = base_cols + ["StartTime"] end_cols = base_cols + ["StartTime", "EndTime", "Duration", "CpuTime", "Success"] + dq_cols = ["EventClass", "CurrentTime", "StartTime", "EndTime", "Duration", "CpuTime", "Success", "Error", "TextData"] event_schema = { - "VertiPaqSEQueryBegin": begin_cols, - "VertiPaqSEQueryEnd": end_cols, - "VertiPaqSEQueryCacheMatch": base_cols, "QueryBegin": begin_cols + ["ApplicationName"], "QueryEnd": end_cols + ["ApplicationName"], } + if trace_vertipaq_se: + event_schema["VertiPaqSEQueryBegin"] = begin_cols + event_schema["VertiPaqSEQueryEnd"] = end_cols + event_schema["VertiPaqSEQueryCacheMatch"] = base_cols + if trace_direct_query: + event_schema["DirectQueryBegin"] = dq_cols + event_schema["DirectQueryEnd"] = dq_cols + query_results = {} if clear_cache_before_run: From 0e9c0306db51003ce1a7ec19492b7e536b702a70 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 7 Nov 2024 08:59:18 +0200 Subject: [PATCH 05/24] updated notebook --- notebooks/Model Optimization.ipynb | 490 ++++++++++++++++++++++++++++- 1 file changed, 489 insertions(+), 1 deletion(-) diff --git a/notebooks/Model Optimization.ipynb b/notebooks/Model Optimization.ipynb index 45ab4d00..2908141c 100644 --- a/notebooks/Model Optimization.ipynb +++ b/notebooks/Model Optimization.ipynb @@ -1 +1,489 @@ -{"cells":[{"cell_type":"markdown","id":"5c27dfd1-4fe0-4a97-92e6-ddf78889aa93","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["### Install the latest .whl package\n","\n","Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version."]},{"cell_type":"code","execution_count":null,"id":"d5cae9db-cef9-48a8-a351-9c5fcc99645c","metadata":{"jupyter":{"outputs_hidden":true,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["%pip install semantic-link-labs"]},{"cell_type":"markdown","id":"cd8de5a0","metadata":{},"source":["### Import the library"]},{"cell_type":"code","execution_count":null,"id":"5cc6eedf","metadata":{},"outputs":[],"source":["import sempy_labs as labs\n","from sempy_labs import lakehouse as lake\n","from sempy_labs import directlake\n","import sempy_labs.report as rep\n","\n","dataset_name = ''\n","workspace_name = None"]},{"cell_type":"markdown","id":"5a3fe6e8-b8aa-4447-812b-7931831e07fe","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["### Vertipaq Analyzer"]},{"cell_type":"code","execution_count":null,"id":"cde43b47-4ecc-46ae-9125-9674819c7eab","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["labs.vertipaq_analyzer(dataset = dataset_name, workspace = workspace_name)"]},{"cell_type":"markdown","id":"419a348f","metadata":{},"source":["Export the Vertipaq Analyzer results to a .zip file in your lakehouse"]},{"cell_type":"code","execution_count":null,"id":"8aa239b3","metadata":{},"outputs":[],"source":["labs.vertipaq_analyzer(dataset = dataset_name, workspace = workspace_name, export = 'zip')"]},{"cell_type":"markdown","id":"2dce0f4f","metadata":{},"source":["Export the Vertipaq Analyzer results to append to delta tables in your lakehouse."]},{"cell_type":"code","execution_count":null,"id":"aef93fc8","metadata":{},"outputs":[],"source":["labs.vertipaq_analyzer(dataset = dataset_name, workspace = workspace_name, export = 'table')"]},{"cell_type":"markdown","id":"1c62a802","metadata":{},"source":["Visualize the contents of an exported Vertipaq Analzyer .zip file."]},{"cell_type":"code","execution_count":null,"id":"9e349954","metadata":{},"outputs":[],"source":["labs.import_vertipaq_analyzer(folder_path = '', file_name = '')"]},{"cell_type":"markdown","id":"456ce0ff","metadata":{},"source":["### Best Practice Analzyer\n","\n","This runs the [standard rules](https://github.com/microsoft/Analysis-Services/tree/master/BestPracticeRules) for semantic models posted on Microsoft's GitHub."]},{"cell_type":"code","execution_count":null,"id":"0a3616b5-566e-414e-a225-fb850d6418dc","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["labs.run_model_bpa(dataset = dataset_name, workspace = workspace_name)"]},{"cell_type":"markdown","id":"6fb32a58","metadata":{},"source":["This runs the Best Practice Analyzer and exports the results to the 'modelbparesults' delta table in your Fabric lakehouse."]},{"cell_type":"code","execution_count":null,"id":"677851c3","metadata":{},"outputs":[],"source":["labs.run_model_bpa(dataset = dataset_name, workspace = workspace_name, export = True)"]},{"cell_type":"markdown","id":"64968a31","metadata":{},"source":["This runs the Best Practice Analyzer with the rules translated into Italian."]},{"cell_type":"code","execution_count":null,"id":"3c7d89e2","metadata":{},"outputs":[],"source":["labs.run_model_bpa(dataset = dataset_name, workspace = workspace_name, language = 'it-IT')"]},{"cell_type":"markdown","id":"255c30bb","metadata":{},"source":["
\n","Note: For analyzing model BPA results at scale, see the Best Practice Analyzer Report notebook (link below).\n","
\n","\n","[Best Practice Analyzer Notebook](https://github.com/microsoft/semantic-link-labs/blob/main/notebooks/Best%20Practice%20Analyzer%20Report.ipynb)"]},{"cell_type":"markdown","id":"bab18a61","metadata":{},"source":["### Run BPA using your own best practice rules"]},{"cell_type":"code","execution_count":null,"id":"59b89387","metadata":{},"outputs":[],"source":["import sempy\n","sempy.fabric._client._utils._init_analysis_services()\n","import Microsoft.AnalysisServices.Tabular as TOM\n","import pandas as pd\n","\n","dataset_name = ''\n","workspace_name = ''\n","\n","rules = pd.DataFrame(\n"," [\n"," (\n"," \"Performance\",\n"," \"Table\",\n"," \"Warning\",\n"," \"Rule name...\",\n"," lambda obj, tom: tom.is_calculated_table(table_name=obj.Name),\n"," 'Rule description...',\n"," '',\n"," ),\n"," (\n"," \"Performance\",\n"," \"Column\",\n"," \"Warning\",\n"," \"Do not use floating point data types\",\n"," lambda obj, tom: obj.DataType == TOM.DataType.Double,\n"," 'The \"Double\" floating point data type should be avoided, as it can result in unpredictable roundoff errors and decreased performance in certain scenarios. Use \"Int64\" or \"Decimal\" where appropriate (but note that \"Decimal\" is limited to 4 digits after the decimal sign).',\n"," )\n"," ],\n"," columns=[\n"," \"Category\",\n"," \"Scope\",\n"," \"Severity\",\n"," \"Rule Name\",\n"," \"Expression\",\n"," \"Description\",\n"," \"URL\",\n"," ],\n",")\n","\n","labs.run_model_bpa(dataset=dataset_name, workspace=workspace_name, rules=rules)"]},{"cell_type":"markdown","id":"8126a1a1","metadata":{},"source":["### Direct Lake\n","\n","Check if any lakehouse tables will hit the [Direct Lake guardrails](https://learn.microsoft.com/power-bi/enterprise/directlake-overview#fallback)."]},{"cell_type":"code","execution_count":null,"id":"e7397b15","metadata":{},"outputs":[],"source":["lake.get_lakehouse_tables(lakehouse = None, workspace = None, extended = True, count_rows = False)"]},{"cell_type":"code","execution_count":null,"id":"b30074cf","metadata":{},"outputs":[],"source":["lake.get_lakehouse_tables(lakehouse = None, workspace = None, extended = True, count_rows = False, export = True)"]},{"cell_type":"markdown","id":"99b84f2b","metadata":{},"source":["Check if any tables in a Direct Lake semantic model will fall back to DirectQuery."]},{"cell_type":"code","execution_count":null,"id":"f837be58","metadata":{},"outputs":[],"source":["directlake.check_fallback_reason(dataset = dataset_name, workspace = workspace_name)"]},{"cell_type":"markdown","id":"8f6df93e","metadata":{},"source":["### [OPTIMIZE](https://docs.delta.io/latest/optimizations-oss.html) your lakehouse delta tables."]},{"cell_type":"code","execution_count":null,"id":"e0262c9e","metadata":{},"outputs":[],"source":["lake.optimize_lakehouse_tables(tables = ['', ''], lakehouse = None, workspace = None)"]},{"cell_type":"markdown","id":"0091d6a0","metadata":{},"source":["Refresh/reframe your Direct Lake semantic model and restore the columns which were in memory prior to the refresh."]},{"cell_type":"code","execution_count":null,"id":"77eef082","metadata":{},"outputs":[],"source":["directlake.warm_direct_lake_cache_isresident(dataset = dataset_name, workspace = workspace_name)"]},{"cell_type":"markdown","id":"dae1a210","metadata":{},"source":["Ensure a warm cache for your users by putting the columns of a Direct Lake semantic model into memory based on the contents of a [perspective](https://learn.microsoft.com/analysis-services/tabular-models/perspectives-ssas-tabular?view=asallproducts-allversions).\n","\n","Perspectives can be created either in [Tabular Editor 3](https://github.com/TabularEditor/TabularEditor3/releases/latest) or in [Tabular Editor 2](https://github.com/TabularEditor/TabularEditor/releases/latest) using the [Perspective Editor](https://www.elegantbi.com/post/perspectiveeditor)."]},{"cell_type":"code","execution_count":null,"id":"43297001","metadata":{},"outputs":[],"source":["directlake.warm_direct_lake_cache_perspective(dataset = dataset_name, workspace = workspace_name, perspective = '', add_dependencies = True)"]}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python"},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5} +{ + "cells": [ + { + "cell_type": "markdown", + "id": "5c27dfd1-4fe0-4a97-92e6-ddf78889aa93", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "### Install the latest .whl package\n", + "\n", + "Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d5cae9db-cef9-48a8-a351-9c5fcc99645c", + "metadata": { + "jupyter": { + "outputs_hidden": true, + "source_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "%pip install semantic-link-labs" + ] + }, + { + "cell_type": "markdown", + "id": "cd8de5a0", + "metadata": {}, + "source": [ + "### Import the library" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5cc6eedf", + "metadata": {}, + "outputs": [], + "source": [ + "import sempy_labs as labs\n", + "from sempy_labs import lakehouse as lake\n", + "from sempy_labs import directlake\n", + "import sempy_labs.report as rep\n", + "\n", + "dataset_name = ''\n", + "workspace_name = None" + ] + }, + { + "cell_type": "markdown", + "id": "5a3fe6e8-b8aa-4447-812b-7931831e07fe", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "### Vertipaq Analyzer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cde43b47-4ecc-46ae-9125-9674819c7eab", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "labs.vertipaq_analyzer(dataset = dataset_name, workspace = workspace_name)" + ] + }, + { + "cell_type": "markdown", + "id": "419a348f", + "metadata": {}, + "source": [ + "Export the Vertipaq Analyzer results to a .zip file in your lakehouse" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8aa239b3", + "metadata": {}, + "outputs": [], + "source": [ + "labs.vertipaq_analyzer(dataset = dataset_name, workspace = workspace_name, export = 'zip')" + ] + }, + { + "cell_type": "markdown", + "id": "2dce0f4f", + "metadata": {}, + "source": [ + "Export the Vertipaq Analyzer results to append to delta tables in your lakehouse." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aef93fc8", + "metadata": {}, + "outputs": [], + "source": [ + "labs.vertipaq_analyzer(dataset = dataset_name, workspace = workspace_name, export = 'table')" + ] + }, + { + "cell_type": "markdown", + "id": "1c62a802", + "metadata": {}, + "source": [ + "Visualize the contents of an exported Vertipaq Analzyer .zip file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9e349954", + "metadata": {}, + "outputs": [], + "source": [ + "labs.import_vertipaq_analyzer(folder_path = '', file_name = '')" + ] + }, + { + "cell_type": "markdown", + "id": "456ce0ff", + "metadata": {}, + "source": [ + "### Best Practice Analzyer\n", + "\n", + "This runs the [standard rules](https://github.com/microsoft/Analysis-Services/tree/master/BestPracticeRules) for semantic models posted on Microsoft's GitHub." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0a3616b5-566e-414e-a225-fb850d6418dc", + "metadata": { + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "labs.run_model_bpa(dataset = dataset_name, workspace = workspace_name)" + ] + }, + { + "cell_type": "markdown", + "id": "6fb32a58", + "metadata": {}, + "source": [ + "This runs the Best Practice Analyzer and exports the results to the 'modelbparesults' delta table in your Fabric lakehouse." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "677851c3", + "metadata": {}, + "outputs": [], + "source": [ + "labs.run_model_bpa(dataset = dataset_name, workspace = workspace_name, export = True)" + ] + }, + { + "cell_type": "markdown", + "id": "64968a31", + "metadata": {}, + "source": [ + "This runs the Best Practice Analyzer with the rules translated into Italian." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3c7d89e2", + "metadata": {}, + "outputs": [], + "source": [ + "labs.run_model_bpa(dataset = dataset_name, workspace = workspace_name, language = 'it-IT')" + ] + }, + { + "cell_type": "markdown", + "id": "255c30bb", + "metadata": {}, + "source": [ + "
\n", + "Note: For analyzing model BPA results at scale, see the Best Practice Analyzer Report notebook (link below).\n", + "
\n", + "\n", + "[Best Practice Analyzer Notebook](https://github.com/microsoft/semantic-link-labs/blob/main/notebooks/Best%20Practice%20Analyzer%20Report.ipynb)" + ] + }, + { + "cell_type": "markdown", + "id": "bab18a61", + "metadata": {}, + "source": [ + "### Run BPA using your own best practice rules" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "59b89387", + "metadata": {}, + "outputs": [], + "source": [ + "import sempy\n", + "sempy.fabric._client._utils._init_analysis_services()\n", + "import Microsoft.AnalysisServices.Tabular as TOM\n", + "import pandas as pd\n", + "\n", + "dataset_name = ''\n", + "workspace_name = ''\n", + "\n", + "rules = pd.DataFrame(\n", + " [\n", + " (\n", + " \"Performance\",\n", + " \"Table\",\n", + " \"Warning\",\n", + " \"Rule name...\",\n", + " lambda obj, tom: tom.is_calculated_table(table_name=obj.Name),\n", + " 'Rule description...',\n", + " '',\n", + " ),\n", + " (\n", + " \"Performance\",\n", + " \"Column\",\n", + " \"Warning\",\n", + " \"Do not use floating point data types\",\n", + " lambda obj, tom: obj.DataType == TOM.DataType.Double,\n", + " 'The \"Double\" floating point data type should be avoided, as it can result in unpredictable roundoff errors and decreased performance in certain scenarios. Use \"Int64\" or \"Decimal\" where appropriate (but note that \"Decimal\" is limited to 4 digits after the decimal sign).',\n", + " )\n", + " ],\n", + " columns=[\n", + " \"Category\",\n", + " \"Scope\",\n", + " \"Severity\",\n", + " \"Rule Name\",\n", + " \"Expression\",\n", + " \"Description\",\n", + " \"URL\",\n", + " ],\n", + ")\n", + "\n", + "labs.run_model_bpa(dataset=dataset_name, workspace=workspace_name, rules=rules)" + ] + }, + { + "cell_type": "markdown", + "id": "d5933de1", + "metadata": {}, + "source": [ + "### Tracing\n", + "\n", + "Trace a set of DAX queries and capture the result of the DAX queries in dataframes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1d04c58f", + "metadata": {}, + "outputs": [], + "source": [ + "trace_result, query_result = labs.trace_dax(\n", + " dataset = dataset_name,\n", + " dax_queries = {\n", + " \"Query1\": \"\"\" EVALUATE SUMMARIZECOLUMNS('DimProduct'[Color], \"1\", [Sales Amount]) \"\"\",\n", + " \"Query2\": \"\"\" EVALUATE SUMMARIZECOLUMNS(\"1\", [Sales Amount]) \"\"\",\n", + " },\n", + " workspace = workspace_name,\n", + " clear_cache_before_run=False,\n", + " clear_cache_before_each_query=False,\n", + " rest_time=2,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "f2586a1b", + "metadata": {}, + "source": [ + "Show the trace results of the DAX queries in a dataframe." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e49d9043", + "metadata": {}, + "outputs": [], + "source": [ + "trace_result\n", + "#trace_result.sort_values(by='Duration', ascending=False) # Order the trace results by the highest duration queries." + ] + }, + { + "cell_type": "markdown", + "id": "0d42a984", + "metadata": {}, + "source": [ + "Show the result of each DAX query in its own dataframe." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1df254c6", + "metadata": {}, + "outputs": [], + "source": [ + "for query, query_value in query_result.items():\n", + " print(query)\n", + " display(query_value)" + ] + }, + { + "cell_type": "markdown", + "id": "8126a1a1", + "metadata": {}, + "source": [ + "### Direct Lake\n", + "\n", + "Check if any lakehouse tables will hit the [Direct Lake guardrails](https://learn.microsoft.com/power-bi/enterprise/directlake-overview#fallback)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e7397b15", + "metadata": {}, + "outputs": [], + "source": [ + "lake.get_lakehouse_tables(lakehouse = None, workspace = None, extended = True, count_rows = False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b30074cf", + "metadata": {}, + "outputs": [], + "source": [ + "lake.get_lakehouse_tables(lakehouse = None, workspace = None, extended = True, count_rows = False, export = True)" + ] + }, + { + "cell_type": "markdown", + "id": "99b84f2b", + "metadata": {}, + "source": [ + "Check if any tables in a Direct Lake semantic model will fall back to DirectQuery." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f837be58", + "metadata": {}, + "outputs": [], + "source": [ + "directlake.check_fallback_reason(dataset = dataset_name, workspace = workspace_name)" + ] + }, + { + "cell_type": "markdown", + "id": "8f6df93e", + "metadata": {}, + "source": [ + "### [OPTIMIZE](https://docs.delta.io/latest/optimizations-oss.html) your lakehouse delta tables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e0262c9e", + "metadata": {}, + "outputs": [], + "source": [ + "lake.optimize_lakehouse_tables(tables = ['', ''], lakehouse = None, workspace = None)" + ] + }, + { + "cell_type": "markdown", + "id": "0091d6a0", + "metadata": {}, + "source": [ + "Refresh/reframe your Direct Lake semantic model and restore the columns which were in memory prior to the refresh." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "77eef082", + "metadata": {}, + "outputs": [], + "source": [ + "directlake.warm_direct_lake_cache_isresident(dataset = dataset_name, workspace = workspace_name)" + ] + }, + { + "cell_type": "markdown", + "id": "dae1a210", + "metadata": {}, + "source": [ + "Ensure a warm cache for your users by putting the columns of a Direct Lake semantic model into memory based on the contents of a [perspective](https://learn.microsoft.com/analysis-services/tabular-models/perspectives-ssas-tabular?view=asallproducts-allversions).\n", + "\n", + "Perspectives can be created either in [Tabular Editor 3](https://github.com/TabularEditor/TabularEditor3/releases/latest) or in [Tabular Editor 2](https://github.com/TabularEditor/TabularEditor/releases/latest) using the [Perspective Editor](https://www.elegantbi.com/post/perspectiveeditor)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "43297001", + "metadata": {}, + "outputs": [], + "source": [ + "directlake.warm_direct_lake_cache_perspective(dataset = dataset_name, workspace = workspace_name, perspective = '', add_dependencies = True)" + ] + } + ], + "metadata": { + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python" + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default" + }, + "synapse_widget": { + "state": {}, + "version": "0.1" + }, + "widgets": {} + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 1ae410486f5e753b5e32a24cef442f96b72f74f7 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 9 Nov 2024 22:05:46 +0200 Subject: [PATCH 06/24] added trace_dax_warm --- src/sempy_labs/_dax.py | 171 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 170 insertions(+), 1 deletion(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index fb1fc0aa..2b3ac3b0 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -1,3 +1,4 @@ +import sempy import sempy.fabric as fabric import pandas as pd from typing import Optional, Tuple @@ -118,7 +119,17 @@ def trace_dax( base_cols = ["EventClass", "EventSubclass", "CurrentTime", "NTUserName", "TextData"] begin_cols = base_cols + ["StartTime"] end_cols = base_cols + ["StartTime", "EndTime", "Duration", "CpuTime", "Success"] - dq_cols = ["EventClass", "CurrentTime", "StartTime", "EndTime", "Duration", "CpuTime", "Success", "Error", "TextData"] + dq_cols = [ + "EventClass", + "CurrentTime", + "StartTime", + "EndTime", + "Duration", + "CpuTime", + "Success", + "Error", + "TextData", + ] event_schema = { "QueryBegin": begin_cols + ["ApplicationName"], @@ -174,3 +185,161 @@ def trace_dax( df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) return df, query_results + + +@log +def trace_dax_warm( + dataset: str, + dax_queries: dict, + rest_time: int = 2, + workspace: Optional[str] = None, +) -> Tuple[pd.DataFrame, dict]: + """ + Runs a warm cache test against a single or set of DAX queries. Valid for import-only or Direct Lake semantic models. + + Parameters + ---------- + dataset : str + Name of the semantic model. + dax_queries : dict + The dax queries to run in a dictionary format. Here is an example: + { + "Sales Amount Test", """ """ EVALUATE SUMMARIZECOLUMNS("Sales Amount", [Sales Amount]) """ """, + "Order Quantity with Product", """ """ EVALUATE SUMMARIZECOLUMNS('Product'[Color], "Order Qty", [Order Qty]) """ """, + } + rest_time : int, default=2 + Rest time (in seconds) between the execution of each DAX query. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + Tuple[pandas.DataFrame, dict] + A pandas dataframe showing the SQL profiler trace results of the DAX queries. + A dictionary of the query results in pandas dataframes. + """ + + if workspace is None: + workspace = fabric.resolve_workspace_name() + + from sempy_labs.tom import connect_semantic_model + from sempy_labs._refresh_semantic_model import refresh_semantic_model + + sempy.fabric._client._utils._init_analysis_services() + import Microsoft.AnalysisServices.Tabular as TOM + + dl_tables = [] + with connect_semantic_model( + dataset=dataset, workspace=workspace, readonly=True + ) as tom: + for p in tom.all_partitions(): + if p.Mode == TOM.ModeType.DirectLake: + dl_tables.append(p.Parent.Name) + elif p.Mode == TOM.ModeType.DirectQuery or ( + p.Mode == TOM.ModeType.Default + and tom.model.Model.DefaultMode == TOM.ModeType.DirectQuery + ): + raise ValueError( + f"{icons.red_dot} This testing is only for Import & Direct Lake semantic models." + ) + + base_cols = ["EventClass", "EventSubclass", "CurrentTime", "NTUserName", "TextData"] + begin_cols = base_cols + ["StartTime"] + end_cols = base_cols + ["StartTime", "EndTime", "Duration", "CpuTime", "Success"] + + event_schema = { + "QueryBegin": begin_cols + ["ApplicationName"], + "QueryEnd": end_cols + ["ApplicationName"], + } + + event_schema["VertiPaqSEQueryBegin"] = begin_cols + event_schema["VertiPaqSEQueryEnd"] = end_cols + event_schema["VertiPaqSEQueryCacheMatch"] = base_cols + + query_results = {} + evaluate_one = """ EVALUATE {1}""" + + # Establish trace connection + with fabric.create_trace_connection( + dataset=dataset, workspace=workspace + ) as trace_connection: + with trace_connection.create_trace(event_schema) as trace: + trace.start() + # Loop through DAX queries + for i, (name, dax) in enumerate(dax_queries.items()): + + if dl_tables: + # Process Clear + refresh_semantic_model( + dataset=dataset, + workspace=workspace, + refresh_type="clearValues", + tables=dl_tables, + ) + # Process Full + refresh_semantic_model( + dataset=dataset, workspace=workspace, refresh_type="full" + ) + # Evaluate {1} + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=evaluate_one + ) + # Run DAX Query + result = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) + else: + # Run DAX Query + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) + # Clear Cache + clear_cache(dataset=dataset, workspace=workspace) + # Evaluate {1} + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=evaluate_one + ) + # Run DAX Query + result = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) + + # Add results to output + query_results[name] = result + + time.sleep(rest_time) + print(f"{icons.green_dot} The '{name}' query has completed.") + + df = trace.stop() + # Allow time to collect trace results + time.sleep(5) + + query_names = list(dax_queries.keys()) + query_begin = df["Event Class"] == "QueryBegin" + + if dl_tables: + # Filter out unnecessary operations + df = df[~df['Application Name'].isin(['PowerBI', 'PowerBIEIM']) & (~df['Text Data'].str.startswith('EVALUATE {1}'))] + + # Name queries per dictionary + df["Query Name"] = (query_begin).cumsum() + df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) + else: + # Filter out unnecessary operations + df = df[(~df['Text Data'].str.startswith('EVALUATE {1}'))] + # Name queries per dictionary + suffix = '_removeXXX' + query_names_full = [item for query in query_names for item in (f"{query}{suffix}", query)] + # Step 3: Assign query names by group and convert to integer + df["Query Name"] = (query_begin).cumsum() + df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + # Step 4: Map to full query names + df["Query Name"] = df["Query Name"].map(lambda x: query_names_full[x - 1]) + df = df[~df['Query Name'].str.endswith(suffix)] + + return df, query_results From 9c2be1d7a2b44acf0e2c3fe0e683cf9732e0ac0b Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 9 Nov 2024 22:42:36 +0200 Subject: [PATCH 07/24] fixed warm --- src/sempy_labs/_dax.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 2b3ac3b0..93c734bf 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -317,12 +317,11 @@ def trace_dax_warm( time.sleep(5) query_names = list(dax_queries.keys()) - query_begin = df["Event Class"] == "QueryBegin" if dl_tables: # Filter out unnecessary operations df = df[~df['Application Name'].isin(['PowerBI', 'PowerBIEIM']) & (~df['Text Data'].str.startswith('EVALUATE {1}'))] - + query_begin = df["Event Class"] == "QueryBegin" # Name queries per dictionary df["Query Name"] = (query_begin).cumsum() df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() @@ -331,6 +330,7 @@ def trace_dax_warm( else: # Filter out unnecessary operations df = df[(~df['Text Data'].str.startswith('EVALUATE {1}'))] + query_begin = df["Event Class"] == "QueryBegin" # Name queries per dictionary suffix = '_removeXXX' query_names_full = [item for query in query_names for item in (f"{query}{suffix}", query)] @@ -342,4 +342,6 @@ def trace_dax_warm( df["Query Name"] = df["Query Name"].map(lambda x: query_names_full[x - 1]) df = df[~df['Query Name'].str.endswith(suffix)] + df = df.reset_index(drop=True) + return df, query_results From a3cb779fc56f05423a6acb60cd419c4841a66b0f Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 14 Nov 2024 11:26:28 +0200 Subject: [PATCH 08/24] updated warm for dl --- src/sempy_labs/_dax.py | 108 +++++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 93c734bf..414de9af 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -225,7 +225,6 @@ def trace_dax_warm( workspace = fabric.resolve_workspace_name() from sempy_labs.tom import connect_semantic_model - from sempy_labs._refresh_semantic_model import refresh_semantic_model sempy.fabric._client._utils._init_analysis_services() import Microsoft.AnalysisServices.Tabular as TOM @@ -267,44 +266,46 @@ def trace_dax_warm( ) as trace_connection: with trace_connection.create_trace(event_schema) as trace: trace.start() + print('trace started...') # Loop through DAX queries for i, (name, dax) in enumerate(dax_queries.items()): - if dl_tables: + # Cold Cache Direct Lake + #if dl_tables: # Process Clear - refresh_semantic_model( - dataset=dataset, - workspace=workspace, - refresh_type="clearValues", - tables=dl_tables, - ) + # refresh_semantic_model( + # dataset=dataset, + # workspace=workspace, + # refresh_type="clearValues", + # tables=dl_tables, + # ) # Process Full - refresh_semantic_model( - dataset=dataset, workspace=workspace, refresh_type="full" - ) + # refresh_semantic_model( + # dataset=dataset, workspace=workspace, refresh_type="full" + # ) # Evaluate {1} - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=evaluate_one - ) + # fabric.evaluate_dax( + # dataset=dataset, workspace=workspace, dax_string=evaluate_one + # ) # Run DAX Query - result = fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax - ) - else: - # Run DAX Query - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax - ) - # Clear Cache - clear_cache(dataset=dataset, workspace=workspace) - # Evaluate {1} - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=evaluate_one - ) - # Run DAX Query - result = fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax - ) + # result = fabric.evaluate_dax( + # dataset=dataset, workspace=workspace, dax_string=dax + # ) + + # Run DAX Query + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) + # Clear Cache + clear_cache(dataset=dataset, workspace=workspace) + # Evaluate {1} + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=evaluate_one + ) + # Run DAX Query + result = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) # Add results to output query_results[name] = result @@ -318,29 +319,30 @@ def trace_dax_warm( query_names = list(dax_queries.keys()) - if dl_tables: + # DL Cold Cache + #if dl_tables: # Filter out unnecessary operations - df = df[~df['Application Name'].isin(['PowerBI', 'PowerBIEIM']) & (~df['Text Data'].str.startswith('EVALUATE {1}'))] - query_begin = df["Event Class"] == "QueryBegin" + # df = df[~df['Application Name'].isin(['PowerBI', 'PowerBIEIM']) & (~df['Text Data'].str.startswith('EVALUATE {1}'))] + # query_begin = df["Event Class"] == "QueryBegin" # Name queries per dictionary - df["Query Name"] = (query_begin).cumsum() - df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() - df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") - df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) - else: - # Filter out unnecessary operations - df = df[(~df['Text Data'].str.startswith('EVALUATE {1}'))] - query_begin = df["Event Class"] == "QueryBegin" - # Name queries per dictionary - suffix = '_removeXXX' - query_names_full = [item for query in query_names for item in (f"{query}{suffix}", query)] - # Step 3: Assign query names by group and convert to integer - df["Query Name"] = (query_begin).cumsum() - df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() - df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") - # Step 4: Map to full query names - df["Query Name"] = df["Query Name"].map(lambda x: query_names_full[x - 1]) - df = df[~df['Query Name'].str.endswith(suffix)] + # df["Query Name"] = (query_begin).cumsum() + # df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + # df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + # df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) + + # Filter out unnecessary operations + df = df[(~df['Text Data'].str.startswith('EVALUATE {1}'))] + query_begin = df["Event Class"] == "QueryBegin" + # Name queries per dictionary + suffix = '_removeXXX' + query_names_full = [item for query in query_names for item in (f"{query}{suffix}", query)] + # Step 3: Assign query names by group and convert to integer + df["Query Name"] = (query_begin).cumsum() + df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + # Step 4: Map to full query names + df["Query Name"] = df["Query Name"].map(lambda x: query_names_full[x - 1]) + df = df[~df['Query Name'].str.endswith(suffix)] df = df.reset_index(drop=True) From 515c0d75faf2d80214df869314609655a2bc89f9 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 14 Nov 2024 14:01:11 +0200 Subject: [PATCH 09/24] added benchmark function --- src/sempy_labs/_dax.py | 456 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 442 insertions(+), 14 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 414de9af..c2cfafc4 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -1,6 +1,7 @@ import sempy import sempy.fabric as fabric import pandas as pd +import datetime from typing import Optional, Tuple from sempy._utils._log import log import time @@ -8,8 +9,13 @@ from sempy_labs._helper_functions import ( resolve_dataset_id, resolve_workspace_name_and_id, + _get_max_run_id, + resolve_lakehouse_name, + save_as_delta_table, ) +from sempy_labs.lakehouse._lakehouse import lakehouse_attached from sempy_labs._clear_cache import clear_cache +from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables @log @@ -68,6 +74,193 @@ def evaluate_dax_impersonation( return df +@log +def get_dax_query_dependencies( + dataset: str, + dax_string: str, + put_in_memory: bool = False, + workspace: Optional[str] = None, +) -> pd.DataFrame: + """ + Obtains the columns on which a DAX query depends, including model dependencies. Shows Vertipaq statistics (i.e. Total Size, Data Size, Dictionary Size, Hierarchy Size) for easy prioritizing. + + Parameters + ---------- + dataset : str + Name of the semantic model. + dax_string : str + The DAX query. + put_in_memory : bool, default=False + If True, ensures that the dependent columns are put into memory in order to give realistic Vertipaq stats (i.e. Total Size etc.). + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing the dependent columns of a given DAX query including model dependencies. + """ + + if workspace is None: + workspace = fabric.resolve_workspace_name(workspace) + + # Escape quotes in dax + dax_string = dax_string.replace('"', '""') + final_query = f""" + EVALUATE + VAR source_query = "{dax_string}" + VAR all_dependencies = SELECTCOLUMNS( + INFO.CALCDEPENDENCY("QUERY", source_query), + "Referenced Object Type",[REFERENCED_OBJECT_TYPE], + "Referenced Table", [REFERENCED_TABLE], + "Referenced Object", [REFERENCED_OBJECT] + ) + RETURN all_dependencies + """ + dep = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=final_query + ) + + # Clean up column names and values (remove outside square brackets, underscorees in object type) + dep.columns = dep.columns.map(lambda x: x[1:-1]) + dep["Referenced Object Type"] = ( + dep["Referenced Object Type"].str.replace("_", " ").str.title() + ) + dep + + # Dataframe df will contain the output of all dependencies of the objects used in the query + df = dep.copy() + + cd = get_model_calc_dependencies(dataset=dataset, workspace=workspace) + + for _, r in dep.iterrows(): + ot = r["Referenced Object Type"] + object_name = r["Referenced Object"] + table_name = r["Referenced Table"] + cd_filt = cd[ + (cd["Object Type"] == ot) + & (cd["Object Name"] == object_name) + & (cd["Table Name"] == table_name) + ] + + # Adds in the dependencies of each object used in the query (i.e. relationship etc.) + if len(cd_filt) > 0: + subset = cd_filt[ + ["Referenced Object Type", "Referenced Table", "Referenced Object"] + ] + df = pd.concat([df, subset], ignore_index=True) + + df.columns = df.columns.map(lambda x: x.replace("Referenced ", "")) + # Remove duplicates + df = df.drop_duplicates().reset_index(drop=True) + # Only show columns and remove the rownumber column + df = df[ + (df["Object Type"].isin(["Column", "Calc Column"])) + & (~df["Object"].str.startswith("RowNumber-")) + ] + + # Get vertipaq stats, filter to just the objects in the df dataframe + df["Full Object"] = format_dax_object_name(df["Table"], df["Object"]) + dfC = fabric.list_columns(dataset=dataset, workspace=workspace, extended=True) + dfC["Full Object"] = format_dax_object_name(dfC["Table Name"], dfC["Column Name"]) + + dfC_filtered = dfC[dfC["Full Object"].isin(df["Full Object"].values)][ + [ + "Table Name", + "Column Name", + "Total Size", + "Data Size", + "Dictionary Size", + "Hierarchy Size", + "Is Resident", + "Full Object", + ] + ].reset_index(drop=True) + + if put_in_memory: + not_in_memory = dfC_filtered[dfC_filtered["Is Resident"] == False] + + if len(not_in_memory) > 0: + tbls = not_in_memory["Table Name"].unique() + + # Run basic query to get columns into memory; completed one table at a time (so as not to overload the capacity) + for table_name in (bar := tqdm(tbls)): + bar.set_description(f"Warming the '{table_name}' table...") + css = ", ".join( + not_in_memory[not_in_memory["Table Name"] == table_name][ + "Full Object" + ] + .astype(str) + .tolist() + ) + dax = f"""EVALUATE TOPN(1,SUMMARIZECOLUMNS({css}))""" + fabric.evaluate_dax( + dataset=dataset, dax_string=dax, workspace=workspace + ) + + # Get column stats again + dfC = fabric.list_columns( + dataset=dataset, workspace=workspace, extended=True + ) + dfC["Full Object"] = format_dax_object_name( + dfC["Table Name"], dfC["Column Name"] + ) + + dfC_filtered = dfC[dfC["Full Object"].isin(df["Full Object"].values)][ + [ + "Table Name", + "Column Name", + "Total Size", + "Data Size", + "Dictionary Size", + "Hierarchy Size", + "Is Resident", + "Full Object", + ] + ].reset_index(drop=True) + + dfC_filtered.drop(["Full Object"], axis=1, inplace=True) + + return dfC_filtered + + +@log +def get_dax_query_memory_size( + dataset: str, dax_string: str, workspace: Optional[str] = None +) -> int: + """ + Obtains the total size, in bytes, used by all columns that a DAX query depends on. + + Parameters + ---------- + dataset : str + Name of the semantic model. + dax_string : str + The DAX query. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + int + The total size, in bytes, used by all columns that the DAX query depends on. + """ + + if workspace is None: + workspace = fabric.resolve_workspace_name(workspace) + + df = get_dax_query_dependencies( + dataset=dataset, workspace=workspace, dax_string=dax_string, put_in_memory=True + ) + + return df["Total Size"].sum() + + +@log def trace_dax( dataset: str, dax_queries: dict, @@ -266,28 +459,28 @@ def trace_dax_warm( ) as trace_connection: with trace_connection.create_trace(event_schema) as trace: trace.start() - print('trace started...') + print(f"{icons.in_progress} Starting performance testing...") # Loop through DAX queries for i, (name, dax) in enumerate(dax_queries.items()): # Cold Cache Direct Lake - #if dl_tables: - # Process Clear + # if dl_tables: + # Process Clear # refresh_semantic_model( # dataset=dataset, # workspace=workspace, # refresh_type="clearValues", # tables=dl_tables, # ) - # Process Full + # Process Full # refresh_semantic_model( # dataset=dataset, workspace=workspace, refresh_type="full" # ) - # Evaluate {1} + # Evaluate {1} # fabric.evaluate_dax( # dataset=dataset, workspace=workspace, dax_string=evaluate_one # ) - # Run DAX Query + # Run DAX Query # result = fabric.evaluate_dax( # dataset=dataset, workspace=workspace, dax_string=dax # ) @@ -320,30 +513,265 @@ def trace_dax_warm( query_names = list(dax_queries.keys()) # DL Cold Cache - #if dl_tables: - # Filter out unnecessary operations + # if dl_tables: + # Filter out unnecessary operations # df = df[~df['Application Name'].isin(['PowerBI', 'PowerBIEIM']) & (~df['Text Data'].str.startswith('EVALUATE {1}'))] # query_begin = df["Event Class"] == "QueryBegin" - # Name queries per dictionary + # Name queries per dictionary # df["Query Name"] = (query_begin).cumsum() # df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() # df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") # df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) - + # Filter out unnecessary operations - df = df[(~df['Text Data'].str.startswith('EVALUATE {1}'))] + df = df[(~df["Text Data"].str.startswith("EVALUATE {1}"))] query_begin = df["Event Class"] == "QueryBegin" # Name queries per dictionary - suffix = '_removeXXX' - query_names_full = [item for query in query_names for item in (f"{query}{suffix}", query)] + suffix = "_removeXXX" + query_names_full = [ + item for query in query_names for item in (f"{query}{suffix}", query) + ] # Step 3: Assign query names by group and convert to integer df["Query Name"] = (query_begin).cumsum() df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") # Step 4: Map to full query names df["Query Name"] = df["Query Name"].map(lambda x: query_names_full[x - 1]) - df = df[~df['Query Name'].str.endswith(suffix)] + df = df[~df["Query Name"].str.endswith(suffix)] df = df.reset_index(drop=True) return df, query_results + + +def run_benchmark(dataset: str, dax_queries: dict, workspace: Optional[str] = None): + + if workspace is None: + workspace = fabric.resolve_workspace_name() + + # Get RunId + table_name = "SLL_Measures" + + if not lakehouse_attached(): + raise ValueError( + f"{icons.red_dot} A lakehouse must be attached to the notebook." + ) + + lakehouse_id = fabric.get_lakehouse_id() + lakehouse_workspace = fabric.resolve_workspace_name() + lakehouse_name = resolve_lakehouse_name(lakehouse_id, lakehouse_workspace) + + dfLT = get_lakehouse_tables(lakehouse_name, lakehouse_workspace) + dfLT_filt = dfLT[dfLT["Table Name"] == table_name] + if len(dfLT_filt) == 0: + run_id = 1 + else: + run_id = _get_max_run_id(lakehouse=lakehouse_name, table_name=table_name) + 1 + time_stamp = datetime.datetime.now() + + def add_cols(df, run_id, time_stamp): + df.insert(0, "Workspace Name", workspace) + df.insert(1, "Dataset Name", dataset) + df["RunId"] = run_id + df["Timestamp"] = time_stamp + + return df + + def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None): + + dfM = fabric.list_measures(dataset=dataset, workspace=workspace)[ + ["Table Name", "Measure Name", "Measure Expression"] + ] + dfC = fabric.list_columns(dataset=dataset, workspace=workspace, extended=True)[ + [ + "Table Name", + "Column Name", + "Type", + "Data Type", + "Column Cardinality", + "Total Size", + "Data Size", + "Dictionary Size", + "Hierarchy Size", + "Encoding", + ] + ] + dfC = dfC[dfC["Type"] != "RowNumber"] + dfT = fabric.list_tables(dataset=dataset, workspace=workspace, extended=True)[ + ["Name", "Type", "Row Count"] + ] + dfT = dfT.rename(columns={"Name": "Table Name"}) + dfR = fabric.list_relationships(dataset=dataset, workspace=workspace) + dfP = fabric.list_partitions( + dataset=dataset, workspace=workspace, extended=True + )[ + [ + "Table Name", + "Partition Name", + "Mode", + "Source Type", + "Query", + "Refreshed Time", + "Modified Time", + "Record Count", + "Records per Segment", + "Segment Count", + ] + ] + + dfM = add_cols(dfM, run_id, time_stamp) + dfC = add_cols(dfC, run_id, time_stamp) + dfT = add_cols(dfT, run_id, time_stamp) + dfR = add_cols(dfR, run_id, time_stamp) + dfP = add_cols(dfP, run_id, time_stamp) + + dfM_schema = { + "Workspace_Name": "string", + "Dataset_Name": "string", + "Table_Name": "string", + "Measure_Name": "string", + "Measure_Expression": "string", + "RunId": "long", + "Timestamp": "timestamp", + } + dfC_schema = { + "Workspace_Name": "string", + "Dataset_Name": "string", + "Table_Name": "string", + "Column_Name": "string", + "Type": "string", + "Data_Type": "string", + "Column_Cardinality": "long", + "Total_Size": "long", + "Data_Size": "long", + "Dictionary_Size": "long", + "Hierarchy_Size": "long", + "Encoding": "string", + "RunId": "long", + "Timestamp": "timestamp", + } + dfT_schema = { + "Workspace_Name": "string", + "Dataset_Name": "string", + "Table_Name": "string", + "Type": "string", + "Row_Count": "long", + "RunId": "long", + "Timestamp": "timestamp", + } + dfP_schema = { + "Workspace_Name": "string", + "Dataset_Name": "string", + "Table_Name": "string", + "Partition_Name": "string", + "Mode": "string", + "Source_Type": "string", + "Query": "string", + "Refreshed_Time": "timestamp", + "Modified_Time": "timestamp", + "Record_Count": "long", + "Records_per_Segment": "double", + "Segment_Count": "long", + "RunId": "long", + "Timestamp": "timestamp", + } + + dfs = { + "Measures": [dfM, dfM_schema], + "Columns": [dfC, dfC_schema], + "Tables": [dfT, dfT_schema], + "Relationships": [dfR, None], + "Partitions": [dfP, dfP_schema], + } + print(f"{icons.in_progress} Saving semantic model metadata...") + for name, (df, df_schema) in dfs.items(): + df.columns = df.columns.str.replace(" ", "_") + + save_as_delta_table( + dataframe=df, + delta_table_name=f"SLL_{name}", + write_mode="append", + schema=df_schema, + ) + + collect_metadata(dataset=dataset, workspace=workspace, run_id=run_id) + + # Run and save trace data + trace_result, query_result = trace_dax_warm( + dataset=dataset, workspace=workspace, dax_queries=dax_queries + ) + + trace_schema = { + "Workspace_Name": "string", + "Dataset_Name": "string", + "Query_Name": "string", + "Query_Text": "string", + "Duration": "long", + "SE_CPU": "long", + "SE_Queries": "long", + "Column_Dependencies": "str", + "Column_Dependencies_Size": "long", + "RunId": "long", + "Timestamp": "timestamp", + } + df = pd.DataFrame(columns=list(trace_schema.keys())) # 'SE Duration' + + for query_name in trace_result["Query Name"].unique().tolist(): + df_trace = trace_result[trace_result["Query Name"] == query_name] + # Capture Query Text + query_begin = df_trace[df_trace["Event Class"] == "QueryBegin"] + query_text = query_begin["Text Data"].iloc[0] + + # Filter to only end events; filter out internal events + df_trace = df_trace[ + (~df_trace["Event Subclass"].str.endswith("Internal")) + & (df_trace["Event Class"].str.endswith("End")) + ] + + # Collect query dependencies + dep = get_dax_query_dependencies( + dataset=dataset, + workspace=workspace, + dax_string=query_text, + put_in_memory=False, + ) + total_size = dep["Total Size"].sum() + table_column_list = [ + f"'{table}'[{column}]" + for table, column in zip(dep["Table Name"], dep["Column Name"]) + ] + + new_data = { + "Workspace_Name": workspace, + "Dataset_Name": dataset, + "Query_Name": query_name, + "Query_Text": query_text, + "Duration": df_trace[df_trace["Event Class"] == "QueryEnd"][ + "Duration" + ].sum(), + "SE_CPU": df_trace["Cpu Time"].sum(), + "SE_Queries": len(df_trace) - 1, + "Column_Dependencies": str(table_column_list), + "Column_Dependencies_Size": total_size, + "RunId": run_id, + "Timestamp": time_stamp, + } + + if df.empty: + df = pd.DataFrame(new_data, index=[0]) + else: + df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) + + save_as_delta_table( + dataframe=df, + delta_table_name="SLL_PerfBenchmark", + write_mode="append", + schema=trace_schema, + ) + + +# def analyze_benchmark_results(): +# """ +# Compares the perf results of the latest test with previous tests. Output is reason(s) why perf improved or degraded. +# """ +# print('hi') From d60af3519629c5519d121b5dc118aabbd058f825 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 15 Nov 2024 11:15:27 +0200 Subject: [PATCH 10/24] added cold cache option --- src/sempy_labs/_dax.py | 169 ++++++++++++++++++++++++----------------- 1 file changed, 100 insertions(+), 69 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index c2cfafc4..288747f4 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -381,14 +381,16 @@ def trace_dax( @log -def trace_dax_warm( +def dax_perf_test( dataset: str, dax_queries: dict, + cache_type: str = "warm", rest_time: int = 2, workspace: Optional[str] = None, ) -> Tuple[pd.DataFrame, dict]: """ - Runs a warm cache test against a single or set of DAX queries. Valid for import-only or Direct Lake semantic models. + Runs a warm/cold cache test against a single or set of DAX queries. Valid for import-only or Direct Lake semantic models. + Cold-cache testing is only available for Direct Lake semantic models. Parameters ---------- @@ -400,6 +402,8 @@ def trace_dax_warm( "Sales Amount Test", """ """ EVALUATE SUMMARIZECOLUMNS("Sales Amount", [Sales Amount]) """ """, "Order Quantity with Product", """ """ EVALUATE SUMMARIZECOLUMNS('Product'[Color], "Order Qty", [Order Qty]) """ """, } + cache_type : str, default="warm" + Allows testing for 'warm' or 'cold' cache scenarios. 'Cold' cache testing is only available for Direct Lake semantic models. rest_time : int, default=2 Rest time (in seconds) between the execution of each DAX query. workspace : str, default=None @@ -413,15 +417,22 @@ def trace_dax_warm( A pandas dataframe showing the SQL profiler trace results of the DAX queries. A dictionary of the query results in pandas dataframes. """ - - if workspace is None: - workspace = fabric.resolve_workspace_name() - from sempy_labs.tom import connect_semantic_model + from sempy_labs._refresh_semantic_model import refresh_semantic_model sempy.fabric._client._utils._init_analysis_services() import Microsoft.AnalysisServices.Tabular as TOM + if workspace is None: + workspace = fabric.resolve_workspace_name() + + cache_type = cache_type.lower() + cache_types = ["warm", "cold"] + if cache_type not in cache_types: + raise ValueError( + f"{icons.red_dot} Invalid cache type. Valid options: {cache_types}." + ) + dl_tables = [] with connect_semantic_model( dataset=dataset, workspace=workspace, readonly=True @@ -437,6 +448,11 @@ def trace_dax_warm( f"{icons.red_dot} This testing is only for Import & Direct Lake semantic models." ) + if cache_type != "warm" and not tom.is_direct_lake(): + raise ValueError( + f"{icons.red_dot} Cold cache testing is only available for Direct Lake semantic models." + ) + base_cols = ["EventClass", "EventSubclass", "CurrentTime", "NTUserName", "TextData"] begin_cols = base_cols + ["StartTime"] end_cols = base_cols + ["StartTime", "EndTime", "Duration", "CpuTime", "Success"] @@ -464,41 +480,41 @@ def trace_dax_warm( for i, (name, dax) in enumerate(dax_queries.items()): # Cold Cache Direct Lake - # if dl_tables: - # Process Clear - # refresh_semantic_model( - # dataset=dataset, - # workspace=workspace, - # refresh_type="clearValues", - # tables=dl_tables, - # ) - # Process Full - # refresh_semantic_model( - # dataset=dataset, workspace=workspace, refresh_type="full" - # ) - # Evaluate {1} - # fabric.evaluate_dax( - # dataset=dataset, workspace=workspace, dax_string=evaluate_one - # ) - # Run DAX Query - # result = fabric.evaluate_dax( - # dataset=dataset, workspace=workspace, dax_string=dax - # ) - - # Run DAX Query - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax - ) - # Clear Cache - clear_cache(dataset=dataset, workspace=workspace) - # Evaluate {1} - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=evaluate_one - ) - # Run DAX Query - result = fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax - ) + if dl_tables and cache_type == "cold": + # Process Clear + refresh_semantic_model( + dataset=dataset, + workspace=workspace, + refresh_type="clearValues", + tables=dl_tables, + ) + # Process Full + refresh_semantic_model( + dataset=dataset, workspace=workspace, refresh_type="full" + ) + # Evaluate {1} + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=evaluate_one + ) + # Run DAX Query + result = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) + else: + # Run DAX Query + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) + # Clear Cache + clear_cache(dataset=dataset, workspace=workspace) + # Evaluate {1} + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=evaluate_one + ) + # Run DAX Query + result = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) # Add results to output query_results[name] = result @@ -513,38 +529,50 @@ def trace_dax_warm( query_names = list(dax_queries.keys()) # DL Cold Cache - # if dl_tables: - # Filter out unnecessary operations - # df = df[~df['Application Name'].isin(['PowerBI', 'PowerBIEIM']) & (~df['Text Data'].str.startswith('EVALUATE {1}'))] - # query_begin = df["Event Class"] == "QueryBegin" - # Name queries per dictionary - # df["Query Name"] = (query_begin).cumsum() - # df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() - # df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") - # df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) - - # Filter out unnecessary operations - df = df[(~df["Text Data"].str.startswith("EVALUATE {1}"))] - query_begin = df["Event Class"] == "QueryBegin" - # Name queries per dictionary - suffix = "_removeXXX" - query_names_full = [ - item for query in query_names for item in (f"{query}{suffix}", query) - ] - # Step 3: Assign query names by group and convert to integer - df["Query Name"] = (query_begin).cumsum() - df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() - df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") - # Step 4: Map to full query names - df["Query Name"] = df["Query Name"].map(lambda x: query_names_full[x - 1]) - df = df[~df["Query Name"].str.endswith(suffix)] + if dl_tables and cache_type == "cold": + # Filter out unnecessary operations + df = df[ + ~df["Application Name"].isin(["PowerBI", "PowerBIEIM"]) + & (~df["Text Data"].str.startswith("EVALUATE {1}")) + ] + query_begin = df["Event Class"] == "QueryBegin" + # Name queries per dictionary + df["Query Name"] = (query_begin).cumsum() + df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) + else: + # Filter out unnecessary operations + df = df[(~df["Text Data"].str.startswith("EVALUATE {1}"))] + query_begin = df["Event Class"] == "QueryBegin" + # Name queries per dictionary + suffix = "_removeXXX" + query_names_full = [ + item + for query in query_names + for item in (f"{query}{suffix}", query) + ] + # Step 3: Assign query names by group and convert to integer + df["Query Name"] = (query_begin).cumsum() + df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + # Step 4: Map to full query names + df["Query Name"] = df["Query Name"].map( + lambda x: query_names_full[x - 1] + ) + df = df[~df["Query Name"].str.endswith(suffix)] df = df.reset_index(drop=True) return df, query_results -def run_benchmark(dataset: str, dax_queries: dict, workspace: Optional[str] = None): +def run_benchmark( + dataset: str, + dax_queries: dict, + cache_type: str = "warm", + workspace: Optional[str] = None, +): if workspace is None: workspace = fabric.resolve_workspace_name() @@ -697,8 +725,11 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) collect_metadata(dataset=dataset, workspace=workspace, run_id=run_id) # Run and save trace data - trace_result, query_result = trace_dax_warm( - dataset=dataset, workspace=workspace, dax_queries=dax_queries + trace_result, query_result = dax_perf_test( + dataset=dataset, + workspace=workspace, + dax_queries=dax_queries, + cache_type=cache_type, ) trace_schema = { From 46de49af691b40635c07d4fa71eff959a6f6f1b2 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 17 Nov 2024 14:53:19 +0200 Subject: [PATCH 11/24] added cache type --- src/sempy_labs/_dax.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 288747f4..4518ce1f 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -426,12 +426,7 @@ def dax_perf_test( if workspace is None: workspace = fabric.resolve_workspace_name() - cache_type = cache_type.lower() - cache_types = ["warm", "cold"] - if cache_type not in cache_types: - raise ValueError( - f"{icons.red_dot} Invalid cache type. Valid options: {cache_types}." - ) + cache_type = _validate_cache_type(cache_type) dl_tables = [] with connect_semantic_model( @@ -567,6 +562,17 @@ def dax_perf_test( return df, query_results +def _validate_cache_type(cache_type: str) -> str: + + cache_type = cache_type.lower() + cache_types = ["warm", "cold"] + if cache_type not in cache_types: + raise ValueError( + f"{icons.red_dot} Invalid cache type. Valid options: {cache_types}." + ) + return cache_type + + def run_benchmark( dataset: str, dax_queries: dict, @@ -577,6 +583,8 @@ def run_benchmark( if workspace is None: workspace = fabric.resolve_workspace_name() + cache_type = _validate_cache_type(cache_type) + # Get RunId table_name = "SLL_Measures" @@ -777,6 +785,7 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Dataset_Name": dataset, "Query_Name": query_name, "Query_Text": query_text, + "Cache_Type": cache_type, "Duration": df_trace[df_trace["Event Class"] == "QueryEnd"][ "Duration" ].sum(), From 9a841eaa13641b44f9c464003c202b3af0b2db34 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 19 Nov 2024 09:39:08 +0200 Subject: [PATCH 12/24] added extra columns to benchmark function --- src/sempy_labs/_dax.py | 10 ++++++++++ src/sempy_labs/_helper_functions.py | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 4518ce1f..9e3b8f2d 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -12,6 +12,7 @@ _get_max_run_id, resolve_lakehouse_name, save_as_delta_table, + _resolve_workspace_capacity_name_id_sku, ) from sempy_labs.lakehouse._lakehouse import lakehouse_attached from sempy_labs._clear_cache import clear_cache @@ -583,6 +584,10 @@ def run_benchmark( if workspace is None: workspace = fabric.resolve_workspace_name() + workspace_id = fabric.resolve_workspace_id(workspace) + capacity_id, capacity_name, sku = _resolve_workspace_capacity_name_id_sku(workspace) + dataset_id = resolve_dataset_id(dataset, workspace) + cache_type = _validate_cache_type(cache_type) # Get RunId @@ -781,8 +786,13 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) ] new_data = { + "Capacity Name": capacity_name, + "Capacity Id": capacity_id, + "SKU": sku, "Workspace_Name": workspace, + "Workspace Id": workspace_id, "Dataset_Name": dataset, + "Dataset Id": dataset_id, "Query_Name": query_name, "Query_Text": query_text, "Cache_Type": cache_type, diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 8c3bb559..001c82eb 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -785,6 +785,27 @@ def get_capacity_name(workspace: Optional[str] = None) -> str: return dfC_filt["Display Name"].iloc[0] +def _resolve_workspace_capacity_name_id_sku( + workspace: Optional[str] = None, +) -> Tuple[UUID, str, str]: + """ """ + + workspace = fabric.resolve_workspace_name(workspace) + filter_condition = urllib.parse.quote(workspace) + dfW = fabric.list_workspaces(filter=f"name eq '{filter_condition}'") + capacity_id = dfW["Capacity Id"].iloc[0] + dfC = fabric.list_capacities() + dfC_filt = dfC[dfC["Id"] == capacity_id] + if len(dfC_filt) == 1: + capacity_name = dfC_filt["Display Name"].iloc[0] + sku = dfC_filt["Sku"].iloc[0] + else: + capacity_name = None + sku = None + + return capacity_id, capacity_name, sku + + def resolve_capacity_name(capacity_id: Optional[UUID] = None) -> str: """ Obtains the capacity name for a given capacity Id. From 12cb7d4cc9038755ea46ffbd48d9a2c6a8d8112b Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 19 Nov 2024 09:44:06 +0200 Subject: [PATCH 13/24] added region --- src/sempy_labs/_dax.py | 3 ++- src/sempy_labs/_helper_functions.py | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 9e3b8f2d..b23cdd55 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -585,7 +585,7 @@ def run_benchmark( workspace = fabric.resolve_workspace_name() workspace_id = fabric.resolve_workspace_id(workspace) - capacity_id, capacity_name, sku = _resolve_workspace_capacity_name_id_sku(workspace) + capacity_id, capacity_name, sku, region = _resolve_workspace_capacity_name_id_sku(workspace) dataset_id = resolve_dataset_id(dataset, workspace) cache_type = _validate_cache_type(cache_type) @@ -789,6 +789,7 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Capacity Name": capacity_name, "Capacity Id": capacity_id, "SKU": sku, + "Region": region, "Workspace_Name": workspace, "Workspace Id": workspace_id, "Dataset_Name": dataset, diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 001c82eb..70f83dfa 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -787,8 +787,7 @@ def get_capacity_name(workspace: Optional[str] = None) -> str: def _resolve_workspace_capacity_name_id_sku( workspace: Optional[str] = None, -) -> Tuple[UUID, str, str]: - """ """ +) -> Tuple[UUID, str, str, str]: workspace = fabric.resolve_workspace_name(workspace) filter_condition = urllib.parse.quote(workspace) @@ -799,11 +798,13 @@ def _resolve_workspace_capacity_name_id_sku( if len(dfC_filt) == 1: capacity_name = dfC_filt["Display Name"].iloc[0] sku = dfC_filt["Sku"].iloc[0] + region = dfC_filt["Region"].iloc[0] else: capacity_name = None sku = None + region = None - return capacity_id, capacity_name, sku + return capacity_id, capacity_name, sku, region def resolve_capacity_name(capacity_id: Optional[UUID] = None) -> str: From 324fcd4a78f5f1445f6d819b80b3c11fa96e1874 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 19 Nov 2024 10:11:27 +0200 Subject: [PATCH 14/24] support for id --- src/sempy_labs/_helper_functions.py | 40 ++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 70f83dfa..8e68549c 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -542,13 +542,13 @@ def language_validate(language: str): return lang -def resolve_workspace_name_and_id(workspace: Optional[str] = None) -> Tuple[str, str]: +def resolve_workspace_name_and_id(workspace: Optional[str | UUID] = None) -> Tuple[str, UUID]: """ Obtains the name and ID of the Fabric workspace. Parameters ---------- - workspace : str, default=None + workspace : str | UUID, default=None The Fabric workspace name. Defaults to None which resolves to the workspace of the attached lakehouse or if no lakehouse attached, resolves to the workspace of the notebook. @@ -561,11 +561,12 @@ def resolve_workspace_name_and_id(workspace: Optional[str] = None) -> Tuple[str, if workspace is None: workspace_id = fabric.get_workspace_id() - workspace = fabric.resolve_workspace_name(workspace_id) + workspace_name = fabric.resolve_workspace_name(workspace_id) else: workspace_id = fabric.resolve_workspace_id(workspace) + workspace_name = fabric.resolve_workspace_name(workspace_id) - return str(workspace), str(workspace_id) + return workspace_name, workspace_id def _extract_json(dataframe: pd.DataFrame) -> dict: @@ -786,11 +787,12 @@ def get_capacity_name(workspace: Optional[str] = None) -> str: def _resolve_workspace_capacity_name_id_sku( - workspace: Optional[str] = None, + workspace: Optional[str | UUID] = None, ) -> Tuple[UUID, str, str, str]: - workspace = fabric.resolve_workspace_name(workspace) - filter_condition = urllib.parse.quote(workspace) + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + + filter_condition = urllib.parse.quote(workspace_name) dfW = fabric.list_workspaces(filter=f"name eq '{filter_condition}'") capacity_id = dfW["Capacity Id"].iloc[0] dfC = fabric.list_capacities() @@ -1167,3 +1169,27 @@ def _get_max_run_id(lakehouse: str, table_name: str) -> int: max_run_id = dfSpark.collect()[0][0] return max_run_id + + +def _is_valid_uuid( + guid: str, +): + """ + Validates if a string is a valid GUID in version 4 + + Parameters + ---------- + guid : str + GUID to be validated. + + Returns + ------- + bool + Boolean that indicates if the string is a GUID or not. + """ + + try: + UUID(str(guid), version=4) + return True + except ValueError: + return False From 6754655fed2c2498d81731a017fec75f381a2762 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 19 Nov 2024 11:34:06 +0200 Subject: [PATCH 15/24] updated benchmark based on darren's feedback --- src/sempy_labs/_dax.py | 44 +++++++++++++++++++++++++---- src/sempy_labs/_helper_functions.py | 4 ++- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index b23cdd55..90fe1461 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -585,7 +585,9 @@ def run_benchmark( workspace = fabric.resolve_workspace_name() workspace_id = fabric.resolve_workspace_id(workspace) - capacity_id, capacity_name, sku, region = _resolve_workspace_capacity_name_id_sku(workspace) + capacity_id, capacity_name, sku, region = _resolve_workspace_capacity_name_id_sku( + workspace + ) dataset_id = resolve_dataset_id(dataset, workspace) cache_type = _validate_cache_type(cache_type) @@ -751,7 +753,9 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Query_Name": "string", "Query_Text": "string", "Duration": "long", + "SE_Duration": "long", "SE_CPU": "long", + "SE_Cache": "long", "SE_Queries": "long", "Column_Dependencies": "str", "Column_Dependencies_Size": "long", @@ -772,6 +776,34 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) & (df_trace["Event Class"].str.endswith("End")) ] + # SE Cache: # of times the cache match event occurred + se_cache = len(df_trace[df_trace["Event Class"] == "VertiPaqSEQueryCacheMatch"]) + + # Total Time -> QueryEnd Duration + total_duration = df_trace[df_trace["Event Class"] == "QueryEnd"][ + "Duration" + ].sum() + + # SE Duration: Sum of Duration for Vertipaq End or DQEnd event + se_duration = df_trace[ + (df_trace["Event Class"].str.endswith("End")) + & (df_trace["Event Class"] != "QueryEnd") + ]["Duration"].sum() + + # SE CPU: Sum of CPU for Vertipaq End or DQEnd event + se_cpu = se_duration = df_trace[ + (df_trace["Event Class"].str.endswith("End")) + & (df_trace["Event Class"] != "QueryEnd") + ]["Cpu Time"].sum() + + # SE Queries: # of times the Vertipaq End or DQEnd event occurred + se_queries = len( + df_trace[ + (df_trace["Event Class"].str.endswith("End")) + & (df_trace["Event Class"] != "QueryEnd") + ] + ) + # Collect query dependencies dep = get_dax_query_dependencies( dataset=dataset, @@ -797,11 +829,11 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Query_Name": query_name, "Query_Text": query_text, "Cache_Type": cache_type, - "Duration": df_trace[df_trace["Event Class"] == "QueryEnd"][ - "Duration" - ].sum(), - "SE_CPU": df_trace["Cpu Time"].sum(), - "SE_Queries": len(df_trace) - 1, + "Duration": total_duration, + "SE_Duration": se_duration, + "SE_Cache": se_cache, + "SE_CPU": se_cpu, + "SE_Queries": se_queries, "Column_Dependencies": str(table_column_list), "Column_Dependencies_Size": total_size, "RunId": run_id, diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 8e68549c..f21b36d6 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -542,7 +542,9 @@ def language_validate(language: str): return lang -def resolve_workspace_name_and_id(workspace: Optional[str | UUID] = None) -> Tuple[str, UUID]: +def resolve_workspace_name_and_id( + workspace: Optional[str | UUID] = None, +) -> Tuple[str, UUID]: """ Obtains the name and ID of the Fabric workspace. From 093ebc02b32a02fdc718fd7e98f3ca52f3a930f1 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 21 Nov 2024 11:17:57 +0200 Subject: [PATCH 16/24] fixes --- docs/source/conf.py | 4 ++-- environment.yml | 2 +- pyproject.toml | 4 ++-- src/sempy_labs/_dax.py | 38 +++++++++++++++++++++----------------- 4 files changed, 26 insertions(+), 22 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index a3996d49..c677dc70 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -13,7 +13,7 @@ project = 'semantic-link-labs' copyright = '2024, Microsoft and community' author = 'Microsoft and community' -release = '0.8.3' +release = '0.8.6' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration @@ -43,4 +43,4 @@ # List of packages we don't want to install in the environment autodoc_mock_imports = ['delta', 'synapse', 'jwt', 'semantic-link-sempy', 'pyspark', 'powerbiclient'] -napoleon_numpy_docstring = True \ No newline at end of file +napoleon_numpy_docstring = True diff --git a/environment.yml b/environment.yml index 01c321d6..fc8058a1 100644 --- a/environment.yml +++ b/environment.yml @@ -6,7 +6,7 @@ dependencies: - pytest-cov - pytest-mock - pip: - - semantic-link-sempy>=0.8.0 + - semantic-link-sempy>=0.8.3 - azure-identity==1.7.1 - azure-storage-blob>=12.9.0 - pandas-stubs diff --git a/pyproject.toml b/pyproject.toml index 20746045..68beda2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name="semantic-link-labs" authors = [ { name = "Microsoft Corporation" }, ] -version="0.8.3" +version="0.8.6" description="Semantic Link Labs for Microsoft Fabric" readme="README.md" requires-python=">=3.10,<3.12" @@ -23,7 +23,7 @@ classifiers = [ license= { text = "MIT License" } dependencies = [ - "semantic-link-sempy>=0.8.0", + "semantic-link-sempy>=0.8.3", "anytree", "powerbiclient", "polib", diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 90fe1461..9d54490e 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -13,11 +13,12 @@ resolve_lakehouse_name, save_as_delta_table, _resolve_workspace_capacity_name_id_sku, + format_dax_object_name, ) from sempy_labs.lakehouse._lakehouse import lakehouse_attached from sempy_labs._clear_cache import clear_cache from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables - +import tqdm @log def evaluate_dax_impersonation( @@ -104,6 +105,8 @@ def get_dax_query_dependencies( A pandas dataframe showing the dependent columns of a given DAX query including model dependencies. """ + from sempy_labs._model_dependencies import get_model_calc_dependencies + if workspace is None: workspace = fabric.resolve_workspace_name(workspace) @@ -117,7 +120,7 @@ def get_dax_query_dependencies( "Referenced Object Type",[REFERENCED_OBJECT_TYPE], "Referenced Table", [REFERENCED_TABLE], "Referenced Object", [REFERENCED_OBJECT] - ) + ) RETURN all_dependencies """ dep = fabric.evaluate_dax( @@ -737,7 +740,7 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) schema=df_schema, ) - collect_metadata(dataset=dataset, workspace=workspace, run_id=run_id) + #collect_metadata(dataset=dataset, workspace=workspace, run_id=run_id) # Run and save trace data trace_result, query_result = dax_perf_test( @@ -748,10 +751,15 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) ) trace_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", "Workspace_Name": "string", + "Workspace_Id": "string", "Dataset_Name": "string", + "Dataset_Id": "string", "Query_Name": "string", "Query_Text": "string", + "Cache_Type": "string", "Duration": "long", "SE_Duration": "long", "SE_CPU": "long", @@ -818,21 +826,21 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) ] new_data = { - "Capacity Name": capacity_name, - "Capacity Id": capacity_id, + "Capacity_Name": capacity_name, + "Capacity_Id": capacity_id, "SKU": sku, "Region": region, "Workspace_Name": workspace, - "Workspace Id": workspace_id, + "Workspace_Id": workspace_id, "Dataset_Name": dataset, - "Dataset Id": dataset_id, - "Query_Name": query_name, - "Query_Text": query_text, + "Dataset_Id": dataset_id, + "Query_Name": str(query_name), + "Query_Text": str(query_text), "Cache_Type": cache_type, "Duration": total_duration, "SE_Duration": se_duration, - "SE_Cache": se_cache, "SE_CPU": se_cpu, + "SE_Cache": se_cache, "SE_Queries": se_queries, "Column_Dependencies": str(table_column_list), "Column_Dependencies_Size": total_size, @@ -845,16 +853,12 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) else: df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) + df['Query_Text'] = df['Query_Text'].astype(str) + print(df.dtypes) + save_as_delta_table( dataframe=df, delta_table_name="SLL_PerfBenchmark", write_mode="append", schema=trace_schema, ) - - -# def analyze_benchmark_results(): -# """ -# Compares the perf results of the latest test with previous tests. Output is reason(s) why perf improved or degraded. -# """ -# print('hi') From 269e98ca4a112ff092c657a9f9eba298a15e3dac Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 21 Nov 2024 15:02:15 +0200 Subject: [PATCH 17/24] updates --- src/sempy_labs/_dax.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 9d54490e..4d65be66 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -853,8 +853,7 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) else: df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) - df['Query_Text'] = df['Query_Text'].astype(str) - print(df.dtypes) + df['Query_Text'] = df['Query_Text'].astype(str) save_as_delta_table( dataframe=df, From 8ff68e7bbd744987b006d507b3dfd2d7828c7e3d Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 26 Nov 2024 10:51:10 +0200 Subject: [PATCH 18/24] update --- src/sempy_labs/_dax.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 4d65be66..2bd1043b 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -835,7 +835,7 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Dataset_Name": dataset, "Dataset_Id": dataset_id, "Query_Name": str(query_name), - "Query_Text": str(query_text), + "Query_Text": query_text, "Cache_Type": cache_type, "Duration": total_duration, "SE_Duration": se_duration, @@ -855,9 +855,11 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) df['Query_Text'] = df['Query_Text'].astype(str) - save_as_delta_table( - dataframe=df, - delta_table_name="SLL_PerfBenchmark", - write_mode="append", - schema=trace_schema, - ) + return df + + #save_as_delta_table( + # dataframe=df, + # delta_table_name="SLL_PerfBenchmark", + # write_mode="append", + # schema=trace_schema, + #) From e83e4c1ce8e196a84ff6d5e35dac554ac9f5657c Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 26 Nov 2024 17:48:29 +0200 Subject: [PATCH 19/24] many fixes --- src/sempy_labs/_dax.py | 170 ++++++++++++++++++++++++++++++++--------- 1 file changed, 132 insertions(+), 38 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 2bd1043b..cf91503b 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -20,6 +20,7 @@ from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables import tqdm + @log def evaluate_dax_impersonation( dataset: str, @@ -76,11 +77,77 @@ def evaluate_dax_impersonation( return df +def _get_dax_query_dependencies_all( + dataset: str, + dax_string: str, + workspace: Optional[str] = None, +) -> pd.DataFrame: + + from sempy_labs._model_dependencies import get_model_calc_dependencies + + if workspace is None: + workspace = fabric.resolve_workspace_name(workspace) + + # Escape quotes in dax + dax_string = dax_string.replace('"', '""') + final_query = f""" + EVALUATE + VAR source_query = "{dax_string}" + VAR all_dependencies = SELECTCOLUMNS( + INFO.CALCDEPENDENCY("QUERY", source_query), + "Referenced Object Type",[REFERENCED_OBJECT_TYPE], + "Referenced Table", [REFERENCED_TABLE], + "Referenced Object", [REFERENCED_OBJECT] + ) + RETURN all_dependencies + """ + dep = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=final_query + ) + + # Clean up column names and values (remove outside square brackets, underscorees in object type) + dep.columns = dep.columns.map(lambda x: x[1:-1]) + dep["Referenced Object Type"] = ( + dep["Referenced Object Type"].str.replace("_", " ").str.title() + ) + dep + + # Dataframe df will contain the output of all dependencies of the objects used in the query + df = dep.copy() + + cd = get_model_calc_dependencies(dataset=dataset, workspace=workspace) + + for _, r in dep.iterrows(): + ot = r["Referenced Object Type"] + object_name = r["Referenced Object"] + table_name = r["Referenced Table"] + cd_filt = cd[ + (cd["Object Type"] == ot) + & (cd["Object Name"] == object_name) + & (cd["Table Name"] == table_name) + ] + + # Adds in the dependencies of each object used in the query (i.e. relationship etc.) + if len(cd_filt) > 0: + subset = cd_filt[ + ["Referenced Object Type", "Referenced Table", "Referenced Object"] + ] + df = pd.concat([df, subset], ignore_index=True) + + df.columns = df.columns.map(lambda x: x.replace("Referenced ", "")) + df = df[(~df["Object"].str.startswith("RowNumber-"))] + # Remove duplicates + df = df.drop_duplicates().reset_index(drop=True) + + return df + + @log def get_dax_query_dependencies( dataset: str, dax_string: str, put_in_memory: bool = False, + show_vertipaq_stats: bool = True, workspace: Optional[str] = None, ) -> pd.DataFrame: """ @@ -94,6 +161,8 @@ def get_dax_query_dependencies( The DAX query. put_in_memory : bool, default=False If True, ensures that the dependent columns are put into memory in order to give realistic Vertipaq stats (i.e. Total Size etc.). + show_vertipaq_stats : bool, default=True + If True, shows Vertipaq statistics. workspace : str, default=None The Fabric workspace name. Defaults to None which resolves to the workspace of the attached lakehouse @@ -110,6 +179,9 @@ def get_dax_query_dependencies( if workspace is None: workspace = fabric.resolve_workspace_name(workspace) + if put_in_memory: + show_vertipaq_stats = True + # Escape quotes in dax dax_string = dax_string.replace('"', '""') final_query = f""" @@ -166,22 +238,25 @@ def get_dax_query_dependencies( ] # Get vertipaq stats, filter to just the objects in the df dataframe - df["Full Object"] = format_dax_object_name(df["Table"], df["Object"]) - dfC = fabric.list_columns(dataset=dataset, workspace=workspace, extended=True) - dfC["Full Object"] = format_dax_object_name(dfC["Table Name"], dfC["Column Name"]) - - dfC_filtered = dfC[dfC["Full Object"].isin(df["Full Object"].values)][ - [ - "Table Name", - "Column Name", - "Total Size", - "Data Size", - "Dictionary Size", - "Hierarchy Size", - "Is Resident", - "Full Object", - ] - ].reset_index(drop=True) + if show_vertipaq_stats: + df["Full Object"] = format_dax_object_name(df["Table"], df["Object"]) + dfC = fabric.list_columns(dataset=dataset, workspace=workspace, extended=True) + dfC["Full Object"] = format_dax_object_name( + dfC["Table Name"], dfC["Column Name"] + ) + + dfC_filtered = dfC[dfC["Full Object"].isin(df["Full Object"].values)][ + [ + "Table Name", + "Column Name", + "Total Size", + "Data Size", + "Dictionary Size", + "Hierarchy Size", + "Is Resident", + "Full Object", + ] + ].reset_index(drop=True) if put_in_memory: not_in_memory = dfC_filtered[dfC_filtered["Is Resident"] == False] @@ -225,7 +300,8 @@ def get_dax_query_dependencies( ] ].reset_index(drop=True) - dfC_filtered.drop(["Full Object"], axis=1, inplace=True) + if show_vertipaq_stats: + dfC_filtered.drop(["Full Object"], axis=1, inplace=True) return dfC_filtered @@ -731,8 +807,6 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) } print(f"{icons.in_progress} Saving semantic model metadata...") for name, (df, df_schema) in dfs.items(): - df.columns = df.columns.str.replace(" ", "_") - save_as_delta_table( dataframe=df, delta_table_name=f"SLL_{name}", @@ -740,7 +814,9 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) schema=df_schema, ) - #collect_metadata(dataset=dataset, workspace=workspace, run_id=run_id) + return dfC + + dfC = collect_metadata(dataset=dataset, workspace=workspace, run_id=run_id) # Run and save trace data trace_result, query_result = dax_perf_test( @@ -753,6 +829,8 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) trace_schema = { "Capacity_Name": "string", "Capacity_Id": "string", + "SKU": "string", + "Region": "string", "Workspace_Name": "string", "Workspace_Id": "string", "Dataset_Name": "string", @@ -765,12 +843,14 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "SE_CPU": "long", "SE_Cache": "long", "SE_Queries": "long", - "Column_Dependencies": "str", + "Column_Dependencies": "string", "Column_Dependencies_Size": "long", + "Measure_Dependencies": "string", + "Relationship_Dependencies": "string", "RunId": "long", "Timestamp": "timestamp", } - df = pd.DataFrame(columns=list(trace_schema.keys())) # 'SE Duration' + df = pd.DataFrame(columns=list(trace_schema.keys())) for query_name in trace_result["Query Name"].unique().tolist(): df_trace = trace_result[trace_result["Query Name"] == query_name] @@ -813,17 +893,31 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) ) # Collect query dependencies - dep = get_dax_query_dependencies( + dep = _get_dax_query_dependencies_all( dataset=dataset, workspace=workspace, dax_string=query_text, - put_in_memory=False, ) - total_size = dep["Total Size"].sum() - table_column_list = [ - f"'{table}'[{column}]" - for table, column in zip(dep["Table Name"], dep["Column Name"]) + + # Column dependencies + filtered_dep = dep[dep["Object Type"].isin(["Column", "Calc Column"])][ + ["Table", "Object"] + ] + columns_used = [ + f"'{table}'[{obj}]" + for table, obj in zip(filtered_dep["Table"], filtered_dep["Object"]) ] + dfC["Object"] = format_dax_object_name(dfC["Table_Name"], dfC["Column_Name"]) + dfC_filt = dfC[dfC["Object"].isin(columns_used)] + total_size = dfC_filt["Total_Size"].sum() + + # Measure dependencies + measures_used = dep[dep["Object Type"] == "Measure"]["Object"].tolist() + + # Relationship dependencies + relationships_used = dep[dep["Object Type"] == "Relationship"][ + "Object" + ].tolist() new_data = { "Capacity_Name": capacity_name, @@ -842,8 +936,10 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "SE_CPU": se_cpu, "SE_Cache": se_cache, "SE_Queries": se_queries, - "Column_Dependencies": str(table_column_list), + "Column_Dependencies": str(columns_used), "Column_Dependencies_Size": total_size, + "Measure_Dependencies": str(measures_used), + "Relationship_Dependencies": str(relationships_used), "RunId": run_id, "Timestamp": time_stamp, } @@ -853,13 +949,11 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) else: df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) - df['Query_Text'] = df['Query_Text'].astype(str) + df["Query_Text"] = df["Query_Text"].astype(str) - return df - - #save_as_delta_table( - # dataframe=df, - # delta_table_name="SLL_PerfBenchmark", - # write_mode="append", - # schema=trace_schema, - #) + save_as_delta_table( + dataframe=df, + delta_table_name="SLL_PerfBenchmark", + write_mode="append", + schema=trace_schema, + ) From 15e3bf57910d43d8b3b5b90dda2dc72ab7aea428 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 28 Nov 2024 09:01:59 +0200 Subject: [PATCH 20/24] added extra properties --- src/sempy_labs/_dax.py | 66 ++++++++++++++++++++++++++--- src/sempy_labs/_helper_functions.py | 18 ++++++++ src/sempy_labs/_vertipaq.py | 12 ++---- 3 files changed, 82 insertions(+), 14 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index cf91503b..cd2ef849 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -14,6 +14,7 @@ save_as_delta_table, _resolve_workspace_capacity_name_id_sku, format_dax_object_name, + _conv_model_size, ) from sempy_labs.lakehouse._lakehouse import lakehouse_attached from sempy_labs._clear_cache import clear_cache @@ -668,7 +669,6 @@ def run_benchmark( workspace ) dataset_id = resolve_dataset_id(dataset, workspace) - cache_type = _validate_cache_type(cache_type) # Get RunId @@ -692,8 +692,14 @@ def run_benchmark( time_stamp = datetime.datetime.now() def add_cols(df, run_id, time_stamp): - df.insert(0, "Workspace Name", workspace) - df.insert(1, "Dataset Name", dataset) + df.insert(0, "Capacity Name", capacity_name) + df.insert(1, "Capacity Id", capacity_id) + df.insert(2, "SKU", sku) + df.insert(3, "Region", region) + df.insert(4, "Workspace Name", workspace) + df.insert(5, "Workspace Id", workspace_id) + df.insert(6, "Dataset Name", dataset) + df.insert(7, "Dataset Id", dataset_id) df["RunId"] = run_id df["Timestamp"] = time_stamp @@ -701,6 +707,8 @@ def add_cols(df, run_id, time_stamp): def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None): + from sempy_labs._list_functions import list_tables + dfM = fabric.list_measures(dataset=dataset, workspace=workspace)[ ["Table Name", "Measure Name", "Measure Expression"] ] @@ -719,7 +727,7 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) ] ] dfC = dfC[dfC["Type"] != "RowNumber"] - dfT = fabric.list_tables(dataset=dataset, workspace=workspace, extended=True)[ + dfT = list_tables(dataset=dataset, workspace=workspace, extended=True)[ ["Name", "Type", "Row Count"] ] dfT = dfT.rename(columns={"Name": "Table Name"}) @@ -741,15 +749,42 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) ] ] + dfRLS = fabric.get_row_level_security_permissions(dataset=dataset, workspace=workspace) + + total_size = dfC['Total Size'].sum() + total_size = _conv_model_size(db_total_size=total_size) + dfModel = pd.DataFrame({'Model Size': [total_size]}) + dfM = add_cols(dfM, run_id, time_stamp) dfC = add_cols(dfC, run_id, time_stamp) dfT = add_cols(dfT, run_id, time_stamp) dfR = add_cols(dfR, run_id, time_stamp) dfP = add_cols(dfP, run_id, time_stamp) - + dfRLS = add_cols(dfRLS, run_id, time_stamp) + dfModel = add_cols(dfModel, run_id, time_stamp) + + dfModel_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Model_Size": "long", + "RunId": "long", + "Timestamp": "timestamp", + } dfM_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", "Workspace_Name": "string", + "Workspace_Id": "string", "Dataset_Name": "string", + "Dataset_Id": "string", "Table_Name": "string", "Measure_Name": "string", "Measure_Expression": "string", @@ -757,8 +792,14 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Timestamp": "timestamp", } dfC_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", "Workspace_Name": "string", + "Workspace_Id": "string", "Dataset_Name": "string", + "Dataset_Id": "string", "Table_Name": "string", "Column_Name": "string", "Type": "string", @@ -773,17 +814,30 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Timestamp": "timestamp", } dfT_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", "Workspace_Name": "string", + "Workspace_Id": "string", "Dataset_Name": "string", + "Dataset_Id": "string", "Table_Name": "string", "Type": "string", "Row_Count": "long", + "Table_Size": "long", "RunId": "long", "Timestamp": "timestamp", } dfP_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", "Workspace_Name": "string", + "Workspace_Id": "string", "Dataset_Name": "string", + "Dataset_Id": "string", "Table_Name": "string", "Partition_Name": "string", "Mode": "string", @@ -804,6 +858,8 @@ def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None) "Tables": [dfT, dfT_schema], "Relationships": [dfR, None], "Partitions": [dfP, dfP_schema], + "RowLevelSecurity": [dfRLS, None], + "Model": [dfModel, dfModel_schema], } print(f"{icons.in_progress} Saving semantic model metadata...") for name, (df, df_schema) in dfs.items(): diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index f21b36d6..20953fc1 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -1195,3 +1195,21 @@ def _is_valid_uuid( return True except ValueError: return False + + +def _conv_model_size(db_total_size: int): + + """ + Converting to KB/MB/GB necessitates division by 1024 * 1000. + """ + + if db_total_size >= 1000000000: + y = db_total_size / (1024**3) * 1000000000 + elif db_total_size >= 1000000: + y = db_total_size / (1024**2) * 1000000 + elif db_total_size >= 1000: + y = db_total_size / (1024) * 1000 + else: + y = db_total_size + + return round(y) diff --git a/src/sempy_labs/_vertipaq.py b/src/sempy_labs/_vertipaq.py index 8afbf5dd..167441f4 100644 --- a/src/sempy_labs/_vertipaq.py +++ b/src/sempy_labs/_vertipaq.py @@ -14,6 +14,7 @@ save_as_delta_table, resolve_workspace_capacity, _get_max_run_id, + _conv_model_size, ) from sempy_labs._list_functions import list_relationships, list_tables from sempy_labs.lakehouse import lakehouse_attached, get_lakehouse_tables @@ -396,19 +397,12 @@ def vertipaq_analyzer( export_Hier = dfH_filt.copy() # Model - # Converting to KB/MB/GB necessitates division by 1024 * 1000. - if db_total_size >= 1000000000: - y = db_total_size / (1024**3) * 1000000000 - elif db_total_size >= 1000000: - y = db_total_size / (1024**2) * 1000000 - elif db_total_size >= 1000: - y = db_total_size / (1024) * 1000 - y = round(y) + model_size = _conv_model_size(db_total_size) dfModel = pd.DataFrame( { "Dataset Name": dataset, - "Total Size": y, + "Total Size": model_size, "Table Count": table_count, "Column Count": column_count, "Compatibility Level": compat_level, From bfa55e42cb535fe42fe7ae72d244e174d1cf695f Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 28 Nov 2024 14:47:54 +0200 Subject: [PATCH 21/24] separate save semantic model metadata, create bulk function --- src/sempy_labs/__init__.py | 4 + src/sempy_labs/_dax.py | 188 +---------------- src/sempy_labs/_documentation.py | 307 ++++++++++++++++++++++++++++ src/sempy_labs/_helper_functions.py | 19 +- 4 files changed, 326 insertions(+), 192 deletions(-) diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index c56511a8..d8b6abf2 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -1,3 +1,6 @@ +from sempy_labs._documentation import ( + save_semantic_model_metadata, +) from sempy_labs._external_data_shares import ( list_external_data_shares_in_item, create_external_data_share, @@ -377,4 +380,5 @@ "migrate_fabric_trial_capacity", "create_resource_group", "trace_dax", + "save_semantic_model_metadata", ] diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index cd2ef849..7891e087 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -14,7 +14,6 @@ save_as_delta_table, _resolve_workspace_capacity_name_id_sku, format_dax_object_name, - _conv_model_size, ) from sempy_labs.lakehouse._lakehouse import lakehouse_attached from sempy_labs._clear_cache import clear_cache @@ -661,6 +660,8 @@ def run_benchmark( workspace: Optional[str] = None, ): + from sempy_labs._documentation import save_semantic_model_metadata + if workspace is None: workspace = fabric.resolve_workspace_name() @@ -691,188 +692,9 @@ def run_benchmark( run_id = _get_max_run_id(lakehouse=lakehouse_name, table_name=table_name) + 1 time_stamp = datetime.datetime.now() - def add_cols(df, run_id, time_stamp): - df.insert(0, "Capacity Name", capacity_name) - df.insert(1, "Capacity Id", capacity_id) - df.insert(2, "SKU", sku) - df.insert(3, "Region", region) - df.insert(4, "Workspace Name", workspace) - df.insert(5, "Workspace Id", workspace_id) - df.insert(6, "Dataset Name", dataset) - df.insert(7, "Dataset Id", dataset_id) - df["RunId"] = run_id - df["Timestamp"] = time_stamp - - return df - - def collect_metadata(dataset: str, run_id: int, workspace: Optional[str] = None): - - from sempy_labs._list_functions import list_tables - - dfM = fabric.list_measures(dataset=dataset, workspace=workspace)[ - ["Table Name", "Measure Name", "Measure Expression"] - ] - dfC = fabric.list_columns(dataset=dataset, workspace=workspace, extended=True)[ - [ - "Table Name", - "Column Name", - "Type", - "Data Type", - "Column Cardinality", - "Total Size", - "Data Size", - "Dictionary Size", - "Hierarchy Size", - "Encoding", - ] - ] - dfC = dfC[dfC["Type"] != "RowNumber"] - dfT = list_tables(dataset=dataset, workspace=workspace, extended=True)[ - ["Name", "Type", "Row Count"] - ] - dfT = dfT.rename(columns={"Name": "Table Name"}) - dfR = fabric.list_relationships(dataset=dataset, workspace=workspace) - dfP = fabric.list_partitions( - dataset=dataset, workspace=workspace, extended=True - )[ - [ - "Table Name", - "Partition Name", - "Mode", - "Source Type", - "Query", - "Refreshed Time", - "Modified Time", - "Record Count", - "Records per Segment", - "Segment Count", - ] - ] - - dfRLS = fabric.get_row_level_security_permissions(dataset=dataset, workspace=workspace) - - total_size = dfC['Total Size'].sum() - total_size = _conv_model_size(db_total_size=total_size) - dfModel = pd.DataFrame({'Model Size': [total_size]}) - - dfM = add_cols(dfM, run_id, time_stamp) - dfC = add_cols(dfC, run_id, time_stamp) - dfT = add_cols(dfT, run_id, time_stamp) - dfR = add_cols(dfR, run_id, time_stamp) - dfP = add_cols(dfP, run_id, time_stamp) - dfRLS = add_cols(dfRLS, run_id, time_stamp) - dfModel = add_cols(dfModel, run_id, time_stamp) - - dfModel_schema = { - "Capacity_Name": "string", - "Capacity_Id": "string", - "SKU": "string", - "Region": "string", - "Workspace_Name": "string", - "Workspace_Id": "string", - "Dataset_Name": "string", - "Dataset_Id": "string", - "Model_Size": "long", - "RunId": "long", - "Timestamp": "timestamp", - } - dfM_schema = { - "Capacity_Name": "string", - "Capacity_Id": "string", - "SKU": "string", - "Region": "string", - "Workspace_Name": "string", - "Workspace_Id": "string", - "Dataset_Name": "string", - "Dataset_Id": "string", - "Table_Name": "string", - "Measure_Name": "string", - "Measure_Expression": "string", - "RunId": "long", - "Timestamp": "timestamp", - } - dfC_schema = { - "Capacity_Name": "string", - "Capacity_Id": "string", - "SKU": "string", - "Region": "string", - "Workspace_Name": "string", - "Workspace_Id": "string", - "Dataset_Name": "string", - "Dataset_Id": "string", - "Table_Name": "string", - "Column_Name": "string", - "Type": "string", - "Data_Type": "string", - "Column_Cardinality": "long", - "Total_Size": "long", - "Data_Size": "long", - "Dictionary_Size": "long", - "Hierarchy_Size": "long", - "Encoding": "string", - "RunId": "long", - "Timestamp": "timestamp", - } - dfT_schema = { - "Capacity_Name": "string", - "Capacity_Id": "string", - "SKU": "string", - "Region": "string", - "Workspace_Name": "string", - "Workspace_Id": "string", - "Dataset_Name": "string", - "Dataset_Id": "string", - "Table_Name": "string", - "Type": "string", - "Row_Count": "long", - "Table_Size": "long", - "RunId": "long", - "Timestamp": "timestamp", - } - dfP_schema = { - "Capacity_Name": "string", - "Capacity_Id": "string", - "SKU": "string", - "Region": "string", - "Workspace_Name": "string", - "Workspace_Id": "string", - "Dataset_Name": "string", - "Dataset_Id": "string", - "Table_Name": "string", - "Partition_Name": "string", - "Mode": "string", - "Source_Type": "string", - "Query": "string", - "Refreshed_Time": "timestamp", - "Modified_Time": "timestamp", - "Record_Count": "long", - "Records_per_Segment": "double", - "Segment_Count": "long", - "RunId": "long", - "Timestamp": "timestamp", - } - - dfs = { - "Measures": [dfM, dfM_schema], - "Columns": [dfC, dfC_schema], - "Tables": [dfT, dfT_schema], - "Relationships": [dfR, None], - "Partitions": [dfP, dfP_schema], - "RowLevelSecurity": [dfRLS, None], - "Model": [dfModel, dfModel_schema], - } - print(f"{icons.in_progress} Saving semantic model metadata...") - for name, (df, df_schema) in dfs.items(): - save_as_delta_table( - dataframe=df, - delta_table_name=f"SLL_{name}", - write_mode="append", - schema=df_schema, - ) - - return dfC - - dfC = collect_metadata(dataset=dataset, workspace=workspace, run_id=run_id) + dfC = save_semantic_model_metadata( + dataset=dataset, workspace=workspace, run_id=run_id, time_stamp=time_stamp + ) # Run and save trace data trace_result, query_result = dax_perf_test( diff --git a/src/sempy_labs/_documentation.py b/src/sempy_labs/_documentation.py index 5d957489..86842ca4 100644 --- a/src/sempy_labs/_documentation.py +++ b/src/sempy_labs/_documentation.py @@ -2,6 +2,17 @@ import sempy.fabric as fabric import pandas as pd from typing import List, Optional +import sempy_labs._icons as icons +import datetime +from sempy_labs._helper_functions import ( + save_as_delta_table, + _conv_model_size, + _resolve_workspace_capacity_name_id_sku, + resolve_dataset_id, + _get_max_run_id, + resolve_workspace_name_and_id, +) +from sempy_labs.lakehouse import lakehouse_attached def list_all_items(workspaces: Optional[str | List[str]] = None): @@ -142,3 +153,299 @@ def get_calc_column_expression(table_name, column_name): ) return df + + +def save_semantic_model_metadata( + dataset: str, + workspace: Optional[str] = None, + run_id: Optional[int] = None, + time_stamp: Optional[datetime.datetime] = None, +): + + from sempy_labs._list_functions import list_tables + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + + if run_id is None: + run_id = _get_run_id(table_name='SLL_Measures') + + if time_stamp is None: + time_stamp = datetime.datetime.now() + + capacity_id, capacity_name, sku, region = _resolve_workspace_capacity_name_id_sku( + workspace + ) + dataset_id = resolve_dataset_id(dataset, workspace) + + print(f"{icons.in_progress} Collecting semantic model metadata...") + dfM = fabric.list_measures(dataset=dataset, workspace=workspace)[ + ["Table Name", "Measure Name", "Measure Expression"] + ] + dfC = fabric.list_columns(dataset=dataset, workspace=workspace, extended=True)[ + [ + "Table Name", + "Column Name", + "Type", + "Data Type", + "Column Cardinality", + "Total Size", + "Data Size", + "Dictionary Size", + "Hierarchy Size", + "Encoding", + ] + ] + + total_size = dfC["Total Size"].sum() + total_size = _conv_model_size(db_total_size=total_size) + dfModel = pd.DataFrame({"Model Size": [total_size]}) + + dfC = dfC[dfC["Type"] != "RowNumber"] + dfT = list_tables(dataset=dataset, workspace=workspace, extended=True)[ + ["Name", "Type", "Row Count"] + ] + dfT = dfT.rename(columns={"Name": "Table Name"}) + dfR = fabric.list_relationships(dataset=dataset, workspace=workspace) + dfP = fabric.list_partitions(dataset=dataset, workspace=workspace, extended=True)[ + [ + "Table Name", + "Partition Name", + "Mode", + "Source Type", + "Query", + "Refreshed Time", + "Modified Time", + "Record Count", + "Records per Segment", + "Segment Count", + ] + ] + + dfRLS = fabric.get_row_level_security_permissions( + dataset=dataset, workspace=workspace + ) + + dfH = fabric.list_hierarchies(dataset=dataset, workspace=workspace) + dfCI = fabric.list_calculation_items(dataset=dataset, workspace=workspace) + + def add_cols(df, run_id, time_stamp): + df.insert(0, "Capacity Name", capacity_name) + df.insert(1, "Capacity Id", capacity_id) + df.insert(2, "SKU", sku) + df.insert(3, "Region", region) + df.insert(4, "Workspace Name", workspace) + df.insert(5, "Workspace Id", workspace_id) + df.insert(6, "Dataset Name", dataset) + df.insert(7, "Dataset Id", dataset_id) + df["RunId"] = run_id + df["Timestamp"] = time_stamp + + return df + + dataframes = [dfM, dfC, dfT, dfR, dfP, dfRLS, dfModel, dfH, dfCI] + dataframes = [add_cols(df, run_id, time_stamp) for df in dataframes] + dfM, dfC, dfT, dfR, dfP, dfRLS, dfModel, dfH, dfCI = dataframes + + dfModel_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Model_Size": "long", + "RunId": "long", + "Timestamp": "timestamp", + } + dfM_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Table_Name": "string", + "Measure_Name": "string", + "Measure_Expression": "string", + "RunId": "long", + "Timestamp": "timestamp", + } + dfC_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Table_Name": "string", + "Column_Name": "string", + "Type": "string", + "Data_Type": "string", + "Column_Cardinality": "long", + "Total_Size": "long", + "Data_Size": "long", + "Dictionary_Size": "long", + "Hierarchy_Size": "long", + "Encoding": "string", + "RunId": "long", + "Timestamp": "timestamp", + } + dfT_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Table_Name": "string", + "Type": "string", + "Row_Count": "long", + "Table_Size": "long", + "RunId": "long", + "Timestamp": "timestamp", + } + dfP_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Table_Name": "string", + "Partition_Name": "string", + "Mode": "string", + "Source_Type": "string", + "Query": "string", + "Refreshed_Time": "timestamp", + "Modified_Time": "timestamp", + "Record_Count": "long", + "Records_per_Segment": "double", + "Segment_Count": "long", + "RunId": "long", + "Timestamp": "timestamp", + } + dfH_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Table_Name": "string", + "Column_Name": "string", + "Hierarchy_Name": "string", + "Hierarchy_Description": "string", + "Hierarchy_State": "string", + "Level_Name": "string", + "Level_Description": "string", + "Level_Ordinal": "long", + "RunId": "long", + "Timestamp": "timestamp", + } + dfCI_schema = { + "Capacity_Name": "string", + "Capacity_Id": "string", + "SKU": "string", + "Region": "string", + "Workspace_Name": "string", + "Workspace_Id": "string", + "Dataset_Name": "string", + "Dataset_Id": "string", + "Calculation_Group_Name": "string", + "Hidden": "bool", + "Precedence": "long", + "Description": "string", + "Calculation_Item_Name": "string", + "Ordinal": "long", + "Expression": "string", + "Format_String_Expression": "string", + "State": "string", + "Error_Message": "string", + "RunId": "long", + "Timestamp": "timestamp", + } + + dfs = { + "Measures": [dfM, dfM_schema], + "Columns": [dfC, dfC_schema], + "Tables": [dfT, dfT_schema], + "Relationships": [dfR, None], + "Partitions": [dfP, dfP_schema], + "RowLevelSecurity": [dfRLS, None], + "Model": [dfModel, dfModel_schema], + "Hierarchies": [dfH, dfH_schema], + "CalculationItems": [dfCI, dfCI_schema], + } + print(f"{icons.in_progress} Saving semantic model metadata...") + for name, (df, df_schema) in dfs.items(): + if not df.empty: + save_as_delta_table( + dataframe=df, + delta_table_name=f"SLL_{name}", + write_mode="append", + schema=df_schema, + ) + else: + print( + f"{icons.yellow_dot} The '{dataset}' semantic model within the '{workspace}' contains no {name.lower()}." + ) + + return dfC + + +def save_semantic_model_metadata_bulk(workspace: Optional[str | List[str]] = None): + + time_stamp = datetime.datetime.now() + run_id = _get_run_id(table_name='SLL_Measures') + if isinstance(workspace, str): + workspace = [workspace] + + dfW = fabric.list_workspaces() + if workspace is None: + workspaces = dfW['Name'].tolist() + else: + workspaces = dfW[dfW['Name'].isin(workspace)]['Name'].tolist() + + for w in workspaces: + dfD = fabric.list_datasets(workspace=w, mode='rest') + for _, r in dfD.iterrows(): + d_name = r['Dataset Name'] + save_semantic_model_metadata(dataset=d_name, workspace=workspace, run_id=run_id, time_stamp=time_stamp) + + +def _get_run_id(table_name: str) -> int: + + from sempy_labs.lakehouse import get_lakehouse_tables + + if not lakehouse_attached(): + raise ValueError( + f"{icons.red_dot} A lakehouse must be attached to the notebook." + ) + + dfLT = get_lakehouse_tables() + dfLT_filt = dfLT[dfLT["Table Name"] == table_name] + if len(dfLT_filt) == 0: + run_id = 1 + else: + lakehouse_id = fabric.get_lakehouse_id() + lakehouse_name = fabric.resolve_item_name( + item_id=lakehouse_id, type="Lakehouse", workspace=None + ) + + run_id = ( + _get_max_run_id(lakehouse=lakehouse_name, table_name=table_name) + 1 + ) + + return run_id diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 20953fc1..40999de8 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -434,11 +434,7 @@ def save_as_delta_table( TimestampType, ) - if workspace is None: - workspace_id = fabric.get_workspace_id() - workspace = fabric.resolve_workspace_name(workspace_id) - else: - workspace_id = fabric.resolve_workspace_id(workspace) + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) if lakehouse is None: lakehouse_id = fabric.get_lakehouse_id() @@ -501,9 +497,15 @@ def save_as_delta_table( ).save(filePath) else: spark_df.write.mode(write_mode).format("delta").save(filePath) - print( - f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace}' workspace." - ) + + if write_mode == "append": + print( + f"{icons.green_dot} The dataframe has been appended to the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace}' workspace." + ) + else: + print( + f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace}' workspace." + ) def language_validate(language: str): @@ -1198,7 +1200,6 @@ def _is_valid_uuid( def _conv_model_size(db_total_size: int): - """ Converting to KB/MB/GB necessitates division by 1024 * 1000. """ From 9ede0f6bd1c496153ecb970e97c1216242869f4a Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 22 Dec 2024 09:14:56 +0200 Subject: [PATCH 22/24] added execution metrics --- src/sempy_labs/_dax.py | 6 ++++++ src/sempy_labs/_documentation.py | 23 +++++++++++++---------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 7891e087..54a6d0c7 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -349,6 +349,7 @@ def trace_dax( clear_cache_before_each_query: bool = False, trace_vertipaq_se: bool = False, trace_direct_query: bool = False, + enable_execution_metrics: bool = False, workspace: Optional[str] = None, ) -> Tuple[pd.DataFrame, dict]: """ @@ -374,6 +375,8 @@ def trace_dax( If True, adds the following events to the trace: VertiPaq SE Query Begin, VertiPaq SE Query End, VertiPaq SE Query Cache Match trace_direct_query : bool, default=False If True, adds the following events to the trace: Direct Query Begin, Direct Query End + enable_execution_metrics : bool, default=False + If True, adds the `Execution Metrics `_ to the trace. workspace : str, default=None The Fabric workspace name. Defaults to None which resolves to the workspace of the attached lakehouse @@ -417,6 +420,9 @@ def trace_dax( event_schema["DirectQueryBegin"] = dq_cols event_schema["DirectQueryEnd"] = dq_cols + if enable_execution_metrics: + event_schema["ExecutionMetrics"] = ["EventClass", "TextData", "ApplicationName"] + query_results = {} if clear_cache_before_run: diff --git a/src/sempy_labs/_documentation.py b/src/sempy_labs/_documentation.py index 86842ca4..6627112e 100644 --- a/src/sempy_labs/_documentation.py +++ b/src/sempy_labs/_documentation.py @@ -167,7 +167,7 @@ def save_semantic_model_metadata( (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) if run_id is None: - run_id = _get_run_id(table_name='SLL_Measures') + run_id = _get_run_id(table_name="SLL_Measures") if time_stamp is None: time_stamp = datetime.datetime.now() @@ -408,21 +408,26 @@ def add_cols(df, run_id, time_stamp): def save_semantic_model_metadata_bulk(workspace: Optional[str | List[str]] = None): time_stamp = datetime.datetime.now() - run_id = _get_run_id(table_name='SLL_Measures') + run_id = _get_run_id(table_name="SLL_Measures") if isinstance(workspace, str): workspace = [workspace] dfW = fabric.list_workspaces() if workspace is None: - workspaces = dfW['Name'].tolist() + workspaces = dfW["Name"].tolist() else: - workspaces = dfW[dfW['Name'].isin(workspace)]['Name'].tolist() + workspaces = dfW[dfW["Name"].isin(workspace)]["Name"].tolist() for w in workspaces: - dfD = fabric.list_datasets(workspace=w, mode='rest') + dfD = fabric.list_datasets(workspace=w, mode="rest") for _, r in dfD.iterrows(): - d_name = r['Dataset Name'] - save_semantic_model_metadata(dataset=d_name, workspace=workspace, run_id=run_id, time_stamp=time_stamp) + d_name = r["Dataset Name"] + save_semantic_model_metadata( + dataset=d_name, + workspace=workspace, + run_id=run_id, + time_stamp=time_stamp, + ) def _get_run_id(table_name: str) -> int: @@ -444,8 +449,6 @@ def _get_run_id(table_name: str) -> int: item_id=lakehouse_id, type="Lakehouse", workspace=None ) - run_id = ( - _get_max_run_id(lakehouse=lakehouse_name, table_name=table_name) + 1 - ) + run_id = _get_max_run_id(lakehouse=lakehouse_name, table_name=table_name) + 1 return run_id From 26a0126ccade2afa644607ab4f5c1af720990cee Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 29 Dec 2024 13:28:59 +0200 Subject: [PATCH 23/24] update init --- docs/source/conf.py | 2 +- pyproject.toml | 2 +- src/sempy_labs/__init__.py | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index c677dc70..3ad169dc 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -13,7 +13,7 @@ project = 'semantic-link-labs' copyright = '2024, Microsoft and community' author = 'Microsoft and community' -release = '0.8.6' +release = '0.8.11' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/pyproject.toml b/pyproject.toml index 68beda2c..eaa24e1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name="semantic-link-labs" authors = [ { name = "Microsoft Corporation" }, ] -version="0.8.6" +version="0.8.11" description="Semantic Link Labs for Microsoft Fabric" readme="README.md" requires-python=">=3.10,<3.12" diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index d8b6abf2..f7015193 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -141,6 +141,8 @@ from sempy_labs._dax import ( evaluate_dax_impersonation, trace_dax, + dax_perf_test, + run_benchmark, ) from sempy_labs._generate_semantic_model import ( create_blank_semantic_model, @@ -381,4 +383,6 @@ "create_resource_group", "trace_dax", "save_semantic_model_metadata", + "dax_perf_test", + "run_benchmark", ] From 6395b7eef3e865ba75c855808098684a53909bf4 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 5 Jan 2025 16:00:00 +0200 Subject: [PATCH 24/24] updated dax_perf_test for kay --- src/sempy_labs/_dax.py | 140 ++++++++++++----------------------------- 1 file changed, 40 insertions(+), 100 deletions(-) diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 54a6d0c7..45e7015c 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -470,13 +470,13 @@ def trace_dax( def dax_perf_test( dataset: str, dax_queries: dict, - cache_type: str = "warm", + clear_cache_before_run: bool = False, + refresh_type: Optional[str] = None, rest_time: int = 2, workspace: Optional[str] = None, ) -> Tuple[pd.DataFrame, dict]: """ - Runs a warm/cold cache test against a single or set of DAX queries. Valid for import-only or Direct Lake semantic models. - Cold-cache testing is only available for Direct Lake semantic models. + Runs a performance test on a set of DAX queries. Parameters ---------- @@ -488,8 +488,8 @@ def dax_perf_test( "Sales Amount Test", """ """ EVALUATE SUMMARIZECOLUMNS("Sales Amount", [Sales Amount]) """ """, "Order Quantity with Product", """ """ EVALUATE SUMMARIZECOLUMNS('Product'[Color], "Order Qty", [Order Qty]) """ """, } - cache_type : str, default="warm" - Allows testing for 'warm' or 'cold' cache scenarios. 'Cold' cache testing is only available for Direct Lake semantic models. + clear_cache_before_run : bool, default=False + refresh_type : str, default=None rest_time : int, default=2 Rest time (in seconds) between the execution of each DAX query. workspace : str, default=None @@ -503,37 +503,12 @@ def dax_perf_test( A pandas dataframe showing the SQL profiler trace results of the DAX queries. A dictionary of the query results in pandas dataframes. """ - from sempy_labs.tom import connect_semantic_model from sempy_labs._refresh_semantic_model import refresh_semantic_model - - sempy.fabric._client._utils._init_analysis_services() - import Microsoft.AnalysisServices.Tabular as TOM + from sempy_labs._clear_cache import clear_cache if workspace is None: workspace = fabric.resolve_workspace_name() - cache_type = _validate_cache_type(cache_type) - - dl_tables = [] - with connect_semantic_model( - dataset=dataset, workspace=workspace, readonly=True - ) as tom: - for p in tom.all_partitions(): - if p.Mode == TOM.ModeType.DirectLake: - dl_tables.append(p.Parent.Name) - elif p.Mode == TOM.ModeType.DirectQuery or ( - p.Mode == TOM.ModeType.Default - and tom.model.Model.DefaultMode == TOM.ModeType.DirectQuery - ): - raise ValueError( - f"{icons.red_dot} This testing is only for Import & Direct Lake semantic models." - ) - - if cache_type != "warm" and not tom.is_direct_lake(): - raise ValueError( - f"{icons.red_dot} Cold cache testing is only available for Direct Lake semantic models." - ) - base_cols = ["EventClass", "EventSubclass", "CurrentTime", "NTUserName", "TextData"] begin_cols = base_cols + ["StartTime"] end_cols = base_cols + ["StartTime", "EndTime", "Duration", "CpuTime", "Success"] @@ -548,7 +523,6 @@ def dax_perf_test( event_schema["VertiPaqSEQueryCacheMatch"] = base_cols query_results = {} - evaluate_one = """ EVALUATE {1}""" # Establish trace connection with fabric.create_trace_connection( @@ -560,43 +534,21 @@ def dax_perf_test( # Loop through DAX queries for i, (name, dax) in enumerate(dax_queries.items()): - # Cold Cache Direct Lake - if dl_tables and cache_type == "cold": - # Process Clear - refresh_semantic_model( - dataset=dataset, - workspace=workspace, - refresh_type="clearValues", - tables=dl_tables, - ) - # Process Full - refresh_semantic_model( - dataset=dataset, workspace=workspace, refresh_type="full" - ) - # Evaluate {1} - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=evaluate_one - ) - # Run DAX Query - result = fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax - ) - else: - # Run DAX Query - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax - ) - # Clear Cache + if clear_cache_before_run: clear_cache(dataset=dataset, workspace=workspace) - # Evaluate {1} - fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=evaluate_one - ) - # Run DAX Query - result = fabric.evaluate_dax( - dataset=dataset, workspace=workspace, dax_string=dax + if refresh_type is not None: + refresh_semantic_model( + dataset=dataset, workspace=workspace, refresh_type=refresh_type ) + fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string="""EVALUATE {1}""" + ) + # Run DAX Query + result = fabric.evaluate_dax( + dataset=dataset, workspace=workspace, dax_string=dax + ) + # Add results to output query_results[name] = result @@ -607,41 +559,29 @@ def dax_perf_test( # Allow time to collect trace results time.sleep(5) + # Step 1: Filter out unnecessary operations query_names = list(dax_queries.keys()) - - # DL Cold Cache - if dl_tables and cache_type == "cold": - # Filter out unnecessary operations - df = df[ - ~df["Application Name"].isin(["PowerBI", "PowerBIEIM"]) - & (~df["Text Data"].str.startswith("EVALUATE {1}")) - ] - query_begin = df["Event Class"] == "QueryBegin" - # Name queries per dictionary - df["Query Name"] = (query_begin).cumsum() - df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() - df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") - df["Query Name"] = df["Query Name"].map(lambda x: query_names[x - 1]) - else: - # Filter out unnecessary operations - df = df[(~df["Text Data"].str.startswith("EVALUATE {1}"))] - query_begin = df["Event Class"] == "QueryBegin" - # Name queries per dictionary - suffix = "_removeXXX" - query_names_full = [ - item - for query in query_names - for item in (f"{query}{suffix}", query) - ] - # Step 3: Assign query names by group and convert to integer - df["Query Name"] = (query_begin).cumsum() - df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() - df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") - # Step 4: Map to full query names - df["Query Name"] = df["Query Name"].map( - lambda x: query_names_full[x - 1] - ) - df = df[~df["Query Name"].str.endswith(suffix)] + df = df[ + ~df["Application Name"].isin(["PowerBI", "PowerBIEIM"]) + & (~df["Text Data"].str.startswith("EVALUATE {1}")) + ] + query_begin = df["Event Class"] == "QueryBegin" + # Step 2: Name queries per dictionary + suffix = "_removeXXX" + query_names_full = [ + item + for query in query_names + for item in (f"{query}{suffix}", query) + ] + # Step 3: Assign query names by group and convert to integer + df["Query Name"] = (query_begin).cumsum() + df["Query Name"] = df["Query Name"].where(query_begin, None).ffill() + df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer") + # Step 4: Map to full query names + df["Query Name"] = df["Query Name"].map( + lambda x: query_names_full[x - 1] + ) + df = df[~df["Query Name"].str.endswith(suffix)] df = df.reset_index(drop=True)