From 56b339ad8dd544ede318f0feda2eb78e73dc7f16 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Fri, 8 Nov 2024 17:48:55 +0530 Subject: [PATCH] fix: linting error --- .../src/datahub/ingestion/source/cassandra/cassandra.py | 9 +++++---- .../ingestion/source/cassandra/cassandra_utils.py | 6 +++--- metadata-ingestion/tests/unit/test_cassandra_source.py | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py index 2c898f06ef5d9..7d94c86a224c0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra.py @@ -1,7 +1,7 @@ import json import logging from dataclasses import dataclass, field -from typing import Any, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -43,7 +43,6 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) -from datahub.metadata._schema_classes import DatasetPropertiesClass, TimeStampClass from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass from datahub.metadata.com.linkedin.pegasus2avro.schema import ( SchemaField, @@ -52,8 +51,10 @@ from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, DatasetLineageTypeClass, + DatasetPropertiesClass, OtherSchemaClass, SubTypesClass, + TimeStampClass, UpstreamClass, UpstreamLineageClass, ) @@ -261,7 +262,7 @@ def _extract_columns_from_table( self, keyspace_name: str, table_name: str, dataset_urn: str ) -> Iterable[MetadataWorkUnit]: column_infos = self.cassandra_api.get_columns(keyspace_name, table_name) - schema_fields: list[SchemaField] = list( + schema_fields: List[SchemaField] = list( CassandraToSchemaFieldConverter.get_schema_fields(column_infos) ) if not schema_fields: @@ -272,7 +273,7 @@ def _extract_columns_from_table( return # remove any value that is type bytes, so it can be converted to json - jsonable_column_infos: list[dict[str, Any]] = [] + jsonable_column_infos: List[Dict[str, Any]] = [] for column in column_infos: column_dict = column._asdict() jsonable_column_dict = column_dict.copy() diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py index 4514ec3f89671..c2cb6ec41ffcd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py @@ -1,6 +1,6 @@ import json import logging -from typing import Any, Dict, Generator, List, Optional, Type +from typing import Dict, Generator, List, Optional, Type from datahub.metadata.com.linkedin.pegasus2avro.schema import ( SchemaField, @@ -122,7 +122,7 @@ def _get_cur_field_path(self) -> str: return ".".join(self._prefix_name_stack) def _get_schema_fields( - self, cassandra_column_infos: List[dict[str, Any]] + self, cassandra_column_infos: List ) -> Generator[SchemaField, None, None]: # append each schema field (sort so output is consistent) for column_info in cassandra_column_infos: @@ -166,7 +166,7 @@ def _get_schema_fields( @classmethod def get_schema_fields( - cls, cassandra_column_infos: List[dict[str, Any]] + cls, cassandra_column_infos: List ) -> Generator[SchemaField, None, None]: converter = cls() yield from converter._get_schema_fields(cassandra_column_infos) diff --git a/metadata-ingestion/tests/unit/test_cassandra_source.py b/metadata-ingestion/tests/unit/test_cassandra_source.py index 07b40526005dd..a83b40f856fbb 100644 --- a/metadata-ingestion/tests/unit/test_cassandra_source.py +++ b/metadata-ingestion/tests/unit/test_cassandra_source.py @@ -56,7 +56,7 @@ def test_cassandra_schema_conversion( schema: str, expected_field_paths: List[str] ) -> None: schema_dict: Dict[str, List[Any]] = json.loads(schema) - column_infos: List[dict[str, Any]] = schema_dict["column_infos"] + column_infos: List = schema_dict["column_infos"] actual_fields = list( CassandraToSchemaFieldConverter.get_schema_fields(column_infos) )