Skip to content

Commit

Permalink
Merge branch 'main' into lineage-playwright-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Ashish8689 committed Jan 21, 2025
2 parents e433e97 + 4862308 commit 043f6d7
Show file tree
Hide file tree
Showing 195 changed files with 4,823 additions and 839 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/playwright-mysql-e2e-skip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ on:
- synchronize
- reopened
paths:
- openmetadata-docs/**
- .github/**
- openmetadata-dist/**
- docker/**
- 'openmetadata-docs/**'
- '.github/**'
- 'openmetadata-dist/**'
- 'docker/**'
- '!docker/development/docker-compose.yml'
- '!docker/development/docker-compose-postgres.yml'


jobs:
playwright-ci-mysql:
Expand Down
15 changes: 10 additions & 5 deletions .github/workflows/playwright-mysql-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ on:
- synchronize
- reopened
paths-ignore:
- openmetadata-docs/**
- .github/**
- openmetadata-dist/**
- docker/**
- 'openmetadata-docs/**'
- '.github/**'
- 'openmetadata-dist/**'
- 'docker/**'
- '!docker/development/docker-compose.yml'
- '!docker/development/docker-compose-postgres.yml'


permissions:
contents: read

Expand Down Expand Up @@ -97,7 +100,7 @@ jobs:
working-directory: openmetadata-ui/src/main/resources/ui/
run: yarn --ignore-scripts --frozen-lockfile
- name: Install Playwright Browsers
run: npx playwright@1.44.1 install --with-deps
run: npx playwright@1.48.2 install --with-deps
- name: Run Playwright tests
working-directory: openmetadata-ui/src/main/resources/ui/
run: npx playwright test --shard=${{ matrix.shardIndex }}/${{ matrix.shardTotal }}
Expand Down Expand Up @@ -164,3 +167,5 @@ jobs:
cd ./docker/development
docker compose down --remove-orphans
sudo rm -rf ${PWD}/docker-volume
10 changes: 6 additions & 4 deletions .github/workflows/playwright-postgresql-e2e-skip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ on:
- synchronize
- reopened
paths:
- openmetadata-docs/**
- .github/**
- openmetadata-dist/**
- docker/**
- 'openmetadata-docs/**'
- '.github/**'
- 'openmetadata-dist/**'
- 'docker/**'
- '!docker/development/docker-compose.yml'
- '!docker/development/docker-compose-postgres.yml'

jobs:
playwright-ci-postgresql:
Expand Down
12 changes: 7 additions & 5 deletions .github/workflows/playwright-postgresql-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ on:
- synchronize
- reopened
paths-ignore:
- openmetadata-docs/**
- .github/**
- openmetadata-dist/**
- docker/**
- 'openmetadata-docs/**'
- '.github/**'
- 'openmetadata-dist/**'
- 'docker/**'
- '!docker/development/docker-compose.yml'
- '!docker/development/docker-compose-postgres.yml'

permissions:
contents: read
Expand Down Expand Up @@ -97,7 +99,7 @@ jobs:
working-directory: openmetadata-ui/src/main/resources/ui/
run: yarn --ignore-scripts --frozen-lockfile
- name: Install Playwright Browsers
run: npx playwright@1.44.1 install --with-deps
run: npx playwright@1.48.2 install --with-deps
- name: Run Playwright tests
working-directory: openmetadata-ui/src/main/resources/ui/
run: npx playwright test --shard=${{ matrix.shardIndex }}/${{ matrix.shardTotal }}
Expand Down
2 changes: 1 addition & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
"geoalchemy2": "GeoAlchemy2~=0.12",
"google-cloud-monitoring": "google-cloud-monitoring>=2.0.0",
"google-cloud-storage": "google-cloud-storage==1.43.0",
"google-cloud-storage": "google-cloud-storage>=1.43.0",
"gcsfs": "gcsfs>=2023.1.0",
"great-expectations": "great-expectations>=0.18.0,<0.18.14",
"grpc-tools": "grpcio-tools>=1.47.2",
Expand Down
14 changes: 8 additions & 6 deletions ingestion/src/metadata/automations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,20 @@ def _(

try:
connection = get_connection(request.connection.config)

host_port_str = str(request.connection.config.hostPort or "")
if "localhost" in host_port_str:
result = test_connection_fn(metadata, connection, request.connection.config)
raise_test_connection_exception(result)
if hasattr(request.connection.config, "hostPort"):
host_port_str = str(request.connection.config.hostPort or "")
if "localhost" in host_port_str:
result = test_connection_fn(
metadata, connection, request.connection.config
)
raise_test_connection_exception(result)

test_connection_fn(
metadata, connection, request.connection.config, automation_workflow
)
except Exception as error:
host_port_str = str(getattr(request.connection.config, "hostPort", None) or "")
if "localhost" not in host_port_str:
if not host_port_str or "localhost" not in host_port_str:
raise error

host_port_type = type(request.connection.config.hostPort)
Expand Down
40 changes: 28 additions & 12 deletions ingestion/src/metadata/clients/aws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Module containing AWS Client
"""
import datetime
from enum import Enum
from functools import partial
from typing import Any, Callable, Dict, Optional, Type, TypeVar
Expand Down Expand Up @@ -47,11 +48,22 @@ class AWSAssumeRoleException(Exception):
"""


class AWSAssumeRoleCredentialResponse(BaseModel):
AccessKeyId: str = Field()
SecretAccessKey: str = Field()
SessionToken: Optional[str] = Field(
default=None,
)
Expiration: Optional[datetime.datetime] = None


class AWSAssumeRoleCredentialWrapper(BaseModel):
accessKeyId: str = Field(alias="access_key")
secretAccessKey: CustomSecretStr = Field(alias="secret_key")
sessionToken: Optional[str] = Field(default=None, alias="token")
expiryTime: Optional[str] = Field(alias="expiry_time")
accessKeyId: str = Field()
secretAccessKey: CustomSecretStr = Field()
sessionToken: Optional[str] = Field(
default=None,
)
expiryTime: Optional[str] = Field()


AWSAssumeRoleCredentialFormat = TypeVar(
Expand Down Expand Up @@ -102,12 +114,14 @@ def get_assume_role_config(
)

if resp:
credentials = resp.get("Credentials", {})
credentials: AWSAssumeRoleCredentialResponse = (
AWSAssumeRoleCredentialResponse(**resp.get("Credentials", {}))
)
creds_wrapper = AWSAssumeRoleCredentialWrapper(
accessKeyId=credentials.get("AccessKeyId"),
secretAccessKey=credentials.get("SecretAccessKey"),
sessionToken=credentials.get("SessionToken"),
expiryTime=credentials.get("Expiration").isformat(),
accessKeyId=credentials.AccessKeyId,
secretAccessKey=credentials.SecretAccessKey,
sessionToken=credentials.SessionToken,
expiryTime=credentials.Expiration.isoformat(),
)
if return_type == Dict:
return creds_wrapper.model_dump(by_alias=True)
Expand Down Expand Up @@ -143,9 +157,11 @@ def _get_session(

return Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key.get_secret_value()
if aws_secret_access_key
else None,
aws_secret_access_key=(
aws_secret_access_key.get_secret_value()
if aws_secret_access_key
else None
),
aws_session_token=aws_session_token,
region_name=aws_region,
profile_name=profile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
Expand Down Expand Up @@ -761,3 +762,85 @@ def get_project_name(self, dashboard_details: Any) -> Optional[str]:
f"Error fetching project name for {dashboard_details.id}: {exc}"
)
return None

def yield_dashboard_lineage(
self, dashboard_details: Any
) -> Iterable[Either[OMetaLineageRequest]]:
"""
Yields lineage if config is enabled.
We will look for the data in all the services
we have informed.
"""
for lineage in self.yield_datamodel_dashboard_lineage(dashboard_details) or []:
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideLineage,
)
)
else:
yield lineage

db_service_names = self.get_db_service_names()
for db_service_name in db_service_names or []:
yield from self.yield_dashboard_lineage_details(
dashboard_details, db_service_name
) or []

def yield_datamodel_dashboard_lineage(
self, dashboard_details: Any
) -> Iterable[Either[AddLineageRequest]]:
"""
Returns:
Lineage request between Data Models and Dashboards
"""
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
service_name=self.context.get().dashboard_service,
dashboard_name=dashboard_details.id,
)
dashboard_entity = self.metadata.get_by_name(
entity=Dashboard, fqn=dashboard_fqn
)
if isinstance(dashboard_details, PowerBIReport):
datamodel_fqn = fqn.build(
metadata=self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=dashboard_details.datasetId,
)
datamodel_entity = self.metadata.get_by_name(
entity=DashboardDataModel, fqn=datamodel_fqn
)
if dashboard_entity and datamodel_entity:
yield self._get_add_lineage_request(
to_entity=dashboard_entity, from_entity=datamodel_entity
)
else:
if (
hasattr(self.context.get(), "dataModels")
and self.context.get().dataModels
):
for datamodel in self.context.get().dataModels:
try:
datamodel_fqn = fqn.build(
metadata=self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=datamodel,
)
datamodel_entity = self.metadata.get_by_name(
entity=DashboardDataModel, fqn=datamodel_fqn
)
if dashboard_entity and datamodel_entity:
yield self._get_add_lineage_request(
to_entity=dashboard_entity, from_entity=datamodel_entity
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for data model name [{str(datamodel)}]: {err}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,31 @@ def get_table_partition_details(
database = self.context.get().database
table = self.client.get_table(fqn._build(database, schema_name, table_name))
columns = inspector.get_columns(table_name, schema_name, db_name=database)
if hasattr(table, "external_data_configuration") and hasattr(
table.external_data_configuration, "hive_partitioning"
):
# Ingesting External Hive Partitioned Tables
from google.cloud.bigquery.external_config import ( # pylint: disable=import-outside-toplevel
HivePartitioningOptions,
)

partition_details: HivePartitioningOptions = (
table.external_data_configuration.hive_partitioning
)
return True, TablePartition(
columns=[
PartitionColumnDetails(
columnName=self._get_partition_column_name(
columns=columns,
partition_field_name=field,
),
interval=str(partition_details._properties.get("mode")),
intervalType=PartitionIntervalTypes.OTHER,
)
for field in partition_details._properties.get("fields")
]
)

if table.time_partitioning is not None:
if table.time_partitioning.field:
table_partition = TablePartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def _compute_system_metrics(
return self.system_metrics_computer.get_system_metrics(
table=runner.dataset,
usage_location=self.service_connection_config.usageLocation,
runner=runner,
)

def initialize_system_metrics_computer(self) -> BigQuerySystemMetricsComputer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def yield_pipeline(
sourceUrl=connection_url,
tasks=[
Task(
name=task.id,
name=str(task.id),
)
for task in pipeline_details.tasks or []
],
Expand Down Expand Up @@ -205,7 +205,7 @@ def yield_pipeline_lineage_details(
metadata=self.metadata,
entity_type=Topic,
service_name=self.service_connection.messagingServiceName,
topic_name=topic.name,
topic_name=str(topic.name),
)

topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn)
Expand Down Expand Up @@ -279,7 +279,7 @@ def yield_pipeline_status(
try:
task_status = [
TaskStatus(
name=task.id,
name=str(task.id),
executionStatus=STATUS_MAP.get(task.state, StatusType.Pending),
)
for task in pipeline_details.tasks or []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class KafkaConnectTasks(BaseModel):
default="UNASSIGNED", description="State of the task (e.g., RUNNING, STOPPED)"
)
worker_id: Optional[str] = Field(
..., description="ID of the worker running the task"
default=None, description="ID of the worker running the task"
)


Expand All @@ -43,15 +43,15 @@ class KafkaConnectPipelineDetails(BaseModel):
default="UNASSIGNED",
description="State of the connector (e.g., RUNNING, STOPPED)",
)
tasks: Optional[List[KafkaConnectTasks]]
topics: Optional[List[KafkaConnectTopics]]
conn_type: Optional[str] = Field(..., alias="type")
tasks: Optional[List[KafkaConnectTasks]] = []
topics: Optional[List[KafkaConnectTopics]] = []
conn_type: Optional[str] = Field(default="UNKNOWN", alias="type")


class KafkaConnectDatasetDetails(BaseModel):
table: Optional[str]
database: Optional[str]
container_name: Optional[str]
table: Optional[str] = None
database: Optional[str] = None
container_name: Optional[str] = None

@property
def dataset_type(self):
Expand Down
Loading

0 comments on commit 043f6d7

Please sign in to comment.