Skip to content

Commit

Permalink
fix: pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Nov 13, 2024
1 parent ee56ad8 commit 6170060
Show file tree
Hide file tree
Showing 8 changed files with 1,650 additions and 86 deletions.
4 changes: 2 additions & 2 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,6 @@
"name": "cassandra",
"displayName": "CassandraDB",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra",
"recipe": "source:\n type: cassandra\n config:\n # Coordinates\n contact_point: 'localhost'\n port: 9042\n # Credentials (ensure the user has read access to the system_schema keyspace)\n username: null\n password: null\n\n # Optional Allow / Deny extraction of particular keyspaces.\n # keyspace_pattern:\n # allow:\n # - \".*\""
},
"recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]"
}
]
13 changes: 6 additions & 7 deletions metadata-ingestion/docs/sources/cassandra/cassandra_recipe.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
source:
type: "cassandra"
config:
# Coordinates
# Credentials for on prem cassandra
contact_point: "localhost"
port: 9042

# Credentials
username: "admin"
password: "password"

#datastax astra db
#datastax_astra_cloud_config:
# Or
# Credentials Astra Cloud
#cloud_config:
# secure_connect_bundle: "Path to Secure Connect Bundle (.zip)"
# token: "Application Token"

# Optional
# Optional Allow / Deny extraction of particular keyspaces.
keyspace_pattern:
allow: [".*"]

# Optional
# Optional Allow / Deny extraction of particular tables.
table_pattern:
allow: [".*"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from datahub.ingestion.source.cassandra.cassandra_config import CassandraSourceConfig
from datahub.ingestion.source.cassandra.cassandra_profiling import CassandraProfiler
from datahub.ingestion.source.cassandra.cassandra_utils import (
CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES,
COL_NAMES,
SYSTEM_KEYSPACE_LIST,
VERSION,
CassandraToSchemaFieldConverter,
Expand Down Expand Up @@ -99,12 +99,12 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
else:
raise KeyError(f"Unknown entity {ent_type}.")

def set_ingestion_stage(self, dataset: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{dataset}: {stage}")
def set_ingestion_stage(self, keyspace: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{keyspace}: {stage}")


@dataclass
class CassandraEntites:
class CassandraEntities:
keyspaces: List[str] = field(default_factory=list)
tables: Dict[str, List[str]] = field(
default_factory=dict
Expand Down Expand Up @@ -144,7 +144,7 @@ def __init__(self, ctx: PipelineContext, config: CassandraSourceConfig):
self.config = config
self.report = CassandraSourceReport()
self.cassandra_api = CassandraAPIInterface(config, self.report)
self.cassandra_data = CassandraEntites()
self.cassandra_data = CassandraEntities()
# For profiling
self.profiler = CassandraProfiler(config, self.report, self.cassandra_api)

Expand All @@ -169,9 +169,7 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
keyspaces = self.cassandra_api.get_keyspaces()
for keyspace in keyspaces:
keyspace_name: str = getattr(
keyspace, CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["keyspace_name"]
)
keyspace_name: str = getattr(keyspace, COL_NAMES["keyspace_name"])
if keyspace_name in SYSTEM_KEYSPACE_LIST:
continue

Expand Down Expand Up @@ -205,6 +203,7 @@ def get_workunits_internal(
if self.config.is_profiling_enabled():
for keyspace in self.cassandra_data.keyspaces:
tables = self.cassandra_data.tables.get(keyspace, [])
self.report.set_ingestion_stage(keyspace, PROFILING)
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
Expand Down Expand Up @@ -252,9 +251,7 @@ def _extract_tables_from_keyspace(
tables = self.cassandra_api.get_tables(keyspace_name)
for table in tables:
# define the dataset urn for this table to be used downstream
table_name: str = getattr(
table, CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["table_name"]
)
table_name: str = getattr(table, COL_NAMES["table_name"])
dataset_name: str = f"{keyspace_name}.{table_name}"

if not self.config.table_pattern.allowed(dataset_name):
Expand Down Expand Up @@ -377,9 +374,7 @@ def _extract_views_from_keyspace(

views = self.cassandra_api.get_views(keyspace_name)
for view in views:
view_name: str = getattr(
view, CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["view_name"]
)
view_name: str = getattr(view, COL_NAMES["view_name"])
dataset_name: str = f"{keyspace_name}.{view_name}"
self.report.report_entity_scanned(dataset_name)
dataset_urn: str = make_dataset_urn_with_platform_instance(
Expand Down Expand Up @@ -446,7 +441,7 @@ def _extract_views_from_keyspace(
# NOTE: we don't need to use 'base_table_id' since table is always in same keyspace, see https://docs.datastax.com/en/cql-oss/3.3/cql/cql_reference/cqlCreateMaterializedView.html#cqlCreateMaterializedView__keyspace-name
upstream_urn: str = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=f"{keyspace_name}.{getattr(view, CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES['base_table_name'])}",
name=f"{keyspace_name}.{getattr(view, COL_NAMES['base_table_name'])}",
env=self.config.env,
platform_instance=self.config.platform_instance,
)
Expand Down Expand Up @@ -492,7 +487,6 @@ def generate_profiles(
env=self.config.env,
platform_instance=self.config.platform_instance,
)
self.report.set_ingestion_stage(dataset_name, PROFILING)
yield from self.profiler.get_workunits(dataset_urn, keyspace, table_name)

def get_upstream_fields_of_field_in_datasource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.cassandra.cassandra_config import CassandraSourceConfig
from datahub.ingestion.source.cassandra.cassandra_utils import (
CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES,
COL_NAMES,
CassandraQueries,
)

Expand All @@ -28,8 +28,8 @@ def __init__(self, config: CassandraSourceConfig, report: SourceReport):
def authenticate(self) -> Session:
"""Establish a connection to Cassandra and return the session."""
try:
if self.config.cloud:
cloud_config = self.config.datastax_astra_cloud_config
if self.config.cloud_config:
cloud_config = self.config.cloud_config
assert cloud_config
cluster_cloud_config = {
"connect_timeout": cloud_config.connect_timeout,
Expand All @@ -51,40 +51,35 @@ def authenticate(self) -> Session:
session: Session = cluster.connect()
return session

auth_provider = None
ssl_context = None
if self.config.username and self.config.password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.verify_mode = CERT_NONE
auth_provider = PlainTextAuthProvider(
username=self.config.username, password=self.config.password
)

cluster = Cluster(
[self.config.contact_point],
port=self.config.port,
load_balancing_policy=None,
)

if auth_provider:
cluster = Cluster(
[self.config.contact_point],
port=self.config.port,
auth_provider=auth_provider,
ssl_context=ssl_context,
load_balancing_policy=None,
)
else:
cluster = Cluster(
[self.config.contact_point],
port=self.config.port,
load_balancing_policy=None,
)

session = cluster.connect()
return session
except OperationTimedOut as e:
self.report.warning(
message="Failed to Autheticate", context=f"{str(e.errors)}", exc=e
message="Failed to Authenticate", context=f"{str(e.errors)}", exc=e
)
raise
except DriverException as e:
self.report.warning(
message="Failed to Autheticate", context=f"{str(e)}", exc=e
)
self.report.warning(message="Failed to Authenticate", exc=e)
raise
except Exception as e:
self.report.report_failure(
Expand All @@ -100,9 +95,7 @@ def get_keyspaces(self) -> List:
)
keyspaces = sorted(
keyspaces,
key=lambda k: getattr(
k, CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["keyspace_name"]
),
key=lambda k: getattr(k, COL_NAMES["keyspace_name"]),
)
return keyspaces
except DriverException as e:
Expand All @@ -124,9 +117,7 @@ def get_tables(self, keyspace_name: str) -> List:
)
tables = sorted(
tables,
key=lambda t: getattr(
t, CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["table_name"]
),
key=lambda t: getattr(t, COL_NAMES["table_name"]),
)
return tables
except DriverException as e:
Expand Down Expand Up @@ -171,9 +162,7 @@ def get_views(self, keyspace_name: str) -> List:
)
views = sorted(
views,
key=lambda v: getattr(
v, CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["view_name"]
),
key=lambda v: getattr(v, COL_NAMES["view_name"]),
)
return views
except DriverException as e:
Expand All @@ -188,7 +177,7 @@ def get_views(self, keyspace_name: str) -> List:
)
return []

def execute(self, query: str, limit: Optional[int]) -> List:
def execute(self, query: str, limit: Optional[int] = None) -> List:
"""Fetch stats for cassandra"""
try:
if limit:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class CassandraSourceConfig(
description="Password credential associated with the specified username.",
)

datastax_astra_cloud_config: Optional[CassandraCloudConfig] = Field(
cloud_config: Optional[CassandraCloudConfig] = Field(
default=None,
description="Configuration for cloud-based Cassandra, such as DataStax Astra DB.",
)
Expand Down Expand Up @@ -176,14 +176,3 @@ def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

@property
def cloud(self) -> bool:
"""
Returns True if datastax_astra_cloud_config is present.
"""
return (
self.datastax_astra_cloud_config is not None
and bool(self.datastax_astra_cloud_config.secure_connect_bundle)
and bool(self.datastax_astra_cloud_config.token)
)
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def profile_table(

if self.config.profiling.row_count:
resp = self.api.execute(
CassandraQueries.ROW_COUNT.format(keyspace_name, table_name), limit
CassandraQueries.ROW_COUNT.format(keyspace_name, table_name)
)
if resp:
results["row_count"] = resp[0].row_count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
VERSION: str = "[version=2.0]"

# these column names are present on the system_schema tables
CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES = {
COL_NAMES = {
"keyspace_name": "keyspace_name", # present on all tables
"table_name": "table_name", # present on tables table
"column_name": "column_name", # present on columns table
Expand Down Expand Up @@ -129,13 +129,8 @@ def _get_schema_fields(
if hasattr(column_info, "_asdict")
else column_info
)
column_info["column_name_bytes"] = None
column_name: str = column_info[
CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["column_name"]
]
cassandra_type: str = column_info[
CASSANDRA_SYSTEM_SCHEMA_COLUMN_NAMES["column_type"]
]
column_name: str = column_info[COL_NAMES["column_name"]]
cassandra_type: str = column_info[COL_NAMES["column_type"]]

if cassandra_type is not None:
self._prefix_name_stack.append(f"[type={cassandra_type}].{column_name}")
Expand Down
Loading

0 comments on commit 6170060

Please sign in to comment.