Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 committed Feb 7, 2024
1 parent db3d62d commit 6c48007
Show file tree
Hide file tree
Showing 7 changed files with 1,230 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Constant:
ID = "id"
NAME = "name"
TYPE = "type"
ITEMID = "itemId"
DATASETTYPE = "datasetType"
CREATEDAT = "createdAt"
UPDATEDAT = "updatedAt"
SECUREQRI = "secureQri"
Expand All @@ -52,10 +54,17 @@ class Constant:
MODIFIEDDATE = "modifiedDate"
RESOURCEID = "resourceId"
DATASETSCHEMA = "datasetSchema"
GRAPH = "graph"
NODES = "nodes"
RESOURCES = "resources"
LINEAGE = "lineage"
TABLELABEL = "tableLabel"
TABLEQRI = "tableQRI"
# Websocket response key constants
QID = "qId"
RESULT = "result"
QRETURN = "qReturn"
QTYPE = "qType"
QHANDLE = "qHandle"
QLAYOUT = "qLayout"
QMETA = "qMeta"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class SpaceType(Enum):
DATA = "data"


# Qlik table box type
class BoxType(Enum):
LOADFILE = "load-file" # Table extracted from dataset
BLACKBOX = "blackbox" # Table extracted from data connection


PERSONAL_SPACE_DICT = {
"id": Constant.PERSONAL_SPACE_ID,
"name": Constant.PERSONAL_SPACE_NAME,
Expand Down Expand Up @@ -118,6 +124,7 @@ class QlikDataset(Item):
type: str
size: int
rowCount: int
itemId: str
datasetSchema: List[SchemaField]

@root_validator(pre=True)
Expand All @@ -140,9 +147,26 @@ def update_values(cls, values: Dict) -> Dict:
return values


class AxisProperty(BaseModel):
Title: str = Field(alias="qFallbackTitle")
Min: str = Field(alias="qMin")
Max: str = Field(alias="qMax")


class Chart(BaseModel):
qId: str
qType: str
visualization: str
title: str
subtitle: str
qDimension: List[AxisProperty]
qMeasure: List[AxisProperty]

@root_validator(pre=True)
def update_values(cls, values: Dict) -> Dict:
values[Constant.QID] = values[Constant.QINFO][Constant.QID]
values["qDimension"] = values["qHyperCube"]["qDimensionInfo"]
values["qMeasure"] = values["qHyperCube"]["qMeasureInfo"]
return values


class Sheet(BaseModel):
Expand All @@ -165,28 +189,33 @@ def update_values(cls, values: Dict) -> Dict:
return values


class QlikAppDataset(BaseModel):
class QlikTable(BaseModel):
tableName: str
schemaName: str
databaseName: str
type: BoxType = Field(alias="boxType")
tableAlias: str
dataconnectorid: str
dataconnectorName: str
dataconnectorPlatform: str
spaceId: str
datasetSchema: List[SchemaField] = Field(alias="fields")
tableQri: Optional[str] = None
schemaName: Optional[str] = None

@root_validator(pre=True)
def update_values(cls, values: Dict) -> Dict:
values[Constant.DATABASENAME] = values[Constant.CONNECTORPROPERTIES][
Constant.TABLEQUALIFIERS
][0]
values[Constant.SCHEMANAME] = values[Constant.CONNECTORPROPERTIES][
Constant.TABLEQUALIFIERS
][1]
if values["boxType"] == BoxType.BLACKBOX.value:
values[Constant.DATABASENAME] = values[Constant.CONNECTORPROPERTIES][
Constant.TABLEQUALIFIERS
][0]
values[Constant.SCHEMANAME] = values[Constant.CONNECTORPROPERTIES][
Constant.TABLEQUALIFIERS
][1]
values[Constant.DATACONNECTORID] = values[Constant.CONNECTIONINFO][Constant.ID]
values[Constant.DATACONNECTORPLATFORM] = values[Constant.CONNECTIONINFO][
Constant.SOURCECONNECTORID
]
values[Constant.SPACEID] = values[Constant.CONNECTIONINFO]["space"]
return values


Expand All @@ -195,7 +224,7 @@ class App(Item):
qri: str
qUsage: str
sheets: List[Sheet] = []
datasets: List[QlikAppDataset] = []
tables: List[QlikTable] = []

@root_validator(pre=True)
def update_values(cls, values: Dict) -> Dict:
Expand Down
138 changes: 108 additions & 30 deletions metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import sys
from typing import Any, Dict, List, Optional
from urllib.parse import quote

import requests

Expand All @@ -10,8 +11,8 @@
App,
Chart,
Item,
QlikAppDataset,
QlikDataset,
QlikTable,
Sheet,
Space,
)
Expand Down Expand Up @@ -60,18 +61,20 @@ def get_spaces(self) -> List[Space]:
self.spaces[PERSONAL_SPACE_DICT[Constant.ID]] = PERSONAL_SPACE_DICT[
Constant.NAME
]
except Exception:
self._log_http_error(message="Unable to fetch spaces")
except Exception as e:
self._log_http_error(message=f"Unable to fetch spaces. Exception: {e}")
return spaces

def _get_dataset(self, dataset_id: str) -> Optional[QlikDataset]:
def _get_dataset(self, dataset_id: str, item_id: str) -> Optional[QlikDataset]:
try:
response = self.session.get(f"{self.rest_api_url}/data-sets/{dataset_id}")
response.raise_for_status()
return QlikDataset.parse_obj(response.json())
except Exception:
response_dict = response.json()
response_dict[Constant.ITEMID] = item_id
return QlikDataset.parse_obj(response_dict)
except Exception as e:
self._log_http_error(
message=f"Unable to fetch dataset with id {dataset_id}"
message=f"Unable to fetch dataset with id {dataset_id}. Exception: {e}"
)
return None

Expand All @@ -86,8 +89,28 @@ def get_user_name(self, user_id: str) -> Optional[str]:
user_name = response.json()[Constant.NAME]
self.users[user_id] = user_name
return user_name
except Exception:
self._log_http_error(message=f"Unable to fetch user with id {user_id}")
except Exception as e:
self._log_http_error(
message=f"Unable to fetch user with id {user_id}. Exception: {e}"
)
return None

def _get_chart(
self,
websocket_connection: WebsocketConnection,
chart_id: str,
sheet_id: str,
) -> Optional[Chart]:
try:
websocket_connection.websocket_send_request(
method="GetChild", params={"qId": chart_id}
)
response = websocket_connection.websocket_send_request(method="GetLayout")
return Chart.parse_obj(response[Constant.QLAYOUT])
except Exception as e:
self._log_http_error(
message=f"Unable to fetch chart {chart_id} of sheet {sheet_id}. Exception: {e}"
)
return None

def _get_sheet(
Expand All @@ -102,33 +125,83 @@ def _get_sheet(
response = websocket_connection.websocket_send_request(method="GetLayout")
sheet_dict = response[Constant.QLAYOUT]
sheet = Sheet.parse_obj(sheet_dict[Constant.QMETA])
i = 1
for chart_dict in sheet_dict[Constant.QCHILDLIST][Constant.QITEMS]:
sheet.charts.append(Chart.parse_obj(chart_dict[Constant.QINFO]))
chart = self._get_chart(
websocket_connection,
chart_dict[Constant.QINFO][Constant.QID],
sheet_id,
)
if chart:
if not chart.title:
chart.title = f"Object {i}"
i += 1
sheet.charts.append(chart)
websocket_connection.handle.pop()
return sheet
except Exception:
self._log_http_error(message=f"Unable to fetch sheet with id {sheet_id}")
except Exception as e:
self._log_http_error(
message=f"Unable to fetch sheet with id {sheet_id}. Exception: {e}"
)
return None

def _get_app_used_datasets(
def _add_qri_of_tables(self, tables: List[QlikTable], app_id: str) -> None:
table_qri_dict: Dict[str, str] = {}
app_qri = quote(f"qri:app:sense://{app_id}", safe="")
try:
response = self.session.get(
f"{self.rest_api_url}/lineage-graphs/nodes/{app_qri}/actions/expand?node={app_qri}&level=TABLE"
)
response.raise_for_status()
for table_node_qri in response.json()[Constant.GRAPH][Constant.NODES]:
table_node_qri = quote(table_node_qri, safe="")
response = self.session.get(
f"{self.rest_api_url}/lineage-graphs/nodes/{app_qri}/actions/expand?node={table_node_qri}&level=FIELD"
)
response.raise_for_status()
field_node_qri = list(
response.json()[Constant.GRAPH][Constant.NODES].keys()
)[0]
response = self.session.post(
f"{self.rest_api_url}/lineage-graphs/nodes/{app_qri}/overview",
json=[field_node_qri],
)
response.raise_for_status()
for each_lineage in response.json()[Constant.RESOURCES][0][
Constant.LINEAGE
]:
table_qri_dict[each_lineage[Constant.TABLELABEL]] = each_lineage[
Constant.TABLEQRI
]
for table in tables:
if table.tableName in table_qri_dict:
table.tableQri = table_qri_dict[table.tableName]
except Exception as e:
self._log_http_error(
message=f"Unable to add QRI for tables of app {app_id}. Exception: {e}"
)

def _get_app_used_tables(
self, websocket_connection: WebsocketConnection, app_id: str
) -> List[QlikAppDataset]:
datasets: List[QlikAppDataset] = []
) -> List[QlikTable]:
tables: List[QlikTable] = []
try:
websocket_connection.websocket_send_request(
response = websocket_connection.websocket_send_request(
method="GetObject",
params=["LoadModel"],
)
if not response[Constant.QRETURN][Constant.QTYPE]:
return []
response = websocket_connection.websocket_send_request(method="GetLayout")
for table_dict in response[Constant.QLAYOUT][Constant.TABLES]:
# Condition to Add connection based table only
if table_dict["boxType"] == "blackbox":
datasets.append(QlikAppDataset.parse_obj(table_dict))
tables.append(QlikTable.parse_obj(table_dict))
websocket_connection.handle.pop()
except Exception:
self._add_qri_of_tables(tables, app_id)
except Exception as e:
self._log_http_error(
message=f"Unable to fetch app used datasets for app {app_id}"
message=f"Unable to fetch tables used by app {app_id}. Exception: {e}"
)
return datasets
return tables

def _get_app_sheets(
self, websocket_connection: WebsocketConnection, app_id: str
Expand All @@ -151,8 +224,10 @@ def _get_app_sheets(
if sheet:
sheets.append(sheet)
websocket_connection.handle.pop()
except Exception:
self._log_http_error(message=f"Unable to fetch sheets for app {app_id}")
except Exception as e:
self._log_http_error(
message=f"Unable to fetch sheets for app {app_id}. Exception: {e}"
)
return sheets

def _get_app(self, app_id: str) -> Optional[App]:
Expand All @@ -169,11 +244,13 @@ def _get_app(self, app_id: str) -> Optional[App]:
)
app = App.parse_obj(response[Constant.QLAYOUT])
app.sheets = self._get_app_sheets(websocket_connection, app_id)
app.datasets = self._get_app_used_datasets(websocket_connection, app_id)
app.tables = self._get_app_used_tables(websocket_connection, app_id)
websocket_connection.close_websocket()
return app
except Exception:
self._log_http_error(message=f"Unable to fetch app with id {app_id}")
except Exception as e:
self._log_http_error(
message=f"Unable to fetch app with id {app_id}. Exception: {e}"
)
return None

def get_items(self) -> List[Item]:
Expand All @@ -196,11 +273,12 @@ def get_items(self) -> List[Item]:
items.append(app)
elif resource_type == Constant.DATASET:
dataset = self._get_dataset(
dataset_id=item[Constant.RESOURCEID]
dataset_id=item[Constant.RESOURCEID],
item_id=item[Constant.ID],
)
if dataset:
items.append(dataset)

except Exception:
self._log_http_error(message="Unable to fetch items")
except Exception as e:
self._log_http_error(message=f"Unable to fetch items. Exception: {e}")
return items
Loading

0 comments on commit 6c48007

Please sign in to comment.