From b76f2ab0dc7f995a9bc292f36423261d731758d6 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Mon, 28 Aug 2023 12:13:28 -0400 Subject: [PATCH] feat(ingest/datahub): Improvements, bug fixes, and docs --- .../docs/sources/datahub/README.md | 4 ++ .../docs/sources/datahub/datahub_pre.md | 66 +++++++++++++++++ .../docs/sources/datahub/datahub_recipe.yml | 23 ++++++ .../ingestion/source/datahub/config.py | 30 ++++---- ...l_reader.py => datahub_database_reader.py} | 45 +++++++----- .../source/datahub/datahub_kafka_reader.py | 9 ++- .../source/datahub/datahub_source.py | 70 +++++++++++++------ .../ingestion/source/datahub/report.py | 10 ++- .../datahub/ingestion/source/datahub/state.py | 10 +-- .../datahub/testing/compare_metadata_json.py | 1 - .../state/test_checkpoint.py | 4 +- 11 files changed, 206 insertions(+), 66 deletions(-) create mode 100644 metadata-ingestion/docs/sources/datahub/README.md create mode 100644 metadata-ingestion/docs/sources/datahub/datahub_pre.md create mode 100644 metadata-ingestion/docs/sources/datahub/datahub_recipe.yml rename metadata-ingestion/src/datahub/ingestion/source/datahub/{datahub_mysql_reader.py => datahub_database_reader.py} (67%) diff --git a/metadata-ingestion/docs/sources/datahub/README.md b/metadata-ingestion/docs/sources/datahub/README.md new file mode 100644 index 0000000000000..45afc6e166889 --- /dev/null +++ b/metadata-ingestion/docs/sources/datahub/README.md @@ -0,0 +1,4 @@ +Migrate data from one DataHub instance to another. + +Requires direct access to the database, kafka broker, and kafka schema registry +of the source DataHub instance. diff --git a/metadata-ingestion/docs/sources/datahub/datahub_pre.md b/metadata-ingestion/docs/sources/datahub/datahub_pre.md new file mode 100644 index 0000000000000..6db92801f41e0 --- /dev/null +++ b/metadata-ingestion/docs/sources/datahub/datahub_pre.md @@ -0,0 +1,66 @@ +### Overview + +This source pulls data from two locations: +- The DataHub database, containing a single table holding all versioned aspects +- The DataHub Kafka cluster, reading from the [MCL Log](../../../../docs/what/mxe.md#metadata-change-log-mcl) +topic for timeseries aspects. + +All data is first read from the database, before timeseries data is ingested from kafka. +To prevent this source from potentially running forever, it will not ingest data produced after the +datahub_source ingestion job is started. This `stop_time` is reflected in the report. + +Data from the database and kafka are read in chronological order, specifically by the +createdon timestamp in the database and by kafka offset per partition. In order to +properly read from the database, please ensure that the `createdon` column is indexed. +Newly created databases should have this index, named `timeIndex`, by default, but older +ones you may have to create yourself, with the statement: + +``` +CREATE INDEX timeIndex ON metadata_aspect_v2 (createdon); +``` + +**If you do not have this index, the source may run incredibly slowly and produce +significant database load.** + +#### Stateful Ingestion +On first run, the source will read from the earliest data in the database and the earliest +kafka offsets. Every `commit_state_interval` (default 1000) records, the source will store +a checkpoint to remember its place, i.e. the last createdon timestamp and kafka offsets. +This allows you to stop and restart the source without losing much progress, but note that +you will re-ingest some data at the start of the new run. + +If any errors are encountered in the ingestion process, e.g. we are unable to emit an aspect +due to network errors, the source will keep running, but will stop committing checkpoints, +unless `commit_with_parse_errors` (default `false) is set. Thus, if you re-run the ingestion, +you can re-ingest the data that was missed, but note it will all re-ingest all subsequent data. + +If you want to re-ingest all data, you can set a different `pipeline_name` in your recipe, +or set `stateful_ingestion.ignore_old_state`: + +```yaml +source: + config: + # ... connection config, etc. + stateful_ingestion: + enabled: true + ignore_old_state: true +``` + +#### Limitations +- Can only pull timeseries aspects retained by Kafka, which by default lasts 90 days. +- Does not detect hard timeseries deletions, e.g. if via a `datahub delete` command using the CLI. +Therefore, if you deleted data in this way, it will still exist in the destination instance. +- If you have a significant amount of aspects with the exact same `createdon` timestamp, +stateful ingestion will not be able to save checkpoints partially through that timestamp. +On a subsequent run, all aspects for that timestamp will be ingested. + +#### Performance +On your destination DataHub instance, we suggest the following settings: +- Enable [async ingestion](../../../../docs/deploy/environment-vars.md#ingestion) +- Use standalone consumers +([mae-consumer](../../../../metadata-jobs/mae-consumer-job/README.md) +and [mce-consumer](../../../../metadata-jobs/mce-consumer-job/README.md)) + * If you are migrating large amounts of data, consider scaling consumer replicas. +- Increase the number of gms pods to add redundancy and increase resilience to node evictions + * If you are migrating large amounts of data, consider increasing elasticsearch's + thread count via the `ELASTICSEARCH_THREAD_COUNT` environment variable. diff --git a/metadata-ingestion/docs/sources/datahub/datahub_recipe.yml b/metadata-ingestion/docs/sources/datahub/datahub_recipe.yml new file mode 100644 index 0000000000000..c2dea5d6fd271 --- /dev/null +++ b/metadata-ingestion/docs/sources/datahub/datahub_recipe.yml @@ -0,0 +1,23 @@ +pipeline_name: datahub_source_1 +datahub_api: + server: "http://localhost:8080" # Migrate data from DataHub instance on localhost:8080 + token: "" +source: + type: datahub + config: + include_all_versions: false + database_connection: + scheme: "mysql+pymysql" # or "postgresql+psycopg2" for Postgres + host_port: ":" + username: "" + password: "" + database: "" + kafka_connection: + bootstrap: ":9092" + schema_registry_url: ":8081" + stateful_ingestion: + enabled: true + ignore_old_state: false + extractor_config: + set_system_metadata: false # Replicate system metadata +# sink config if necessary, e.g. to the output DataHub instance diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index a054067d92334..f05b5db912cb9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -4,24 +4,25 @@ from datahub.configuration.kafka import KafkaConsumerConnectionConfig from datahub.ingestion.source.sql.mysql import MySQLConnectionConfig +from datahub.ingestion.source.sql.sql_config import SQLAlchemyConnectionConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfig, StatefulIngestionConfigBase, ) -DEFAULT_MYSQL_TABLE_NAME = "metadata_aspect_v2" +DEFAULT_DATABASE_TABLE_NAME = "metadata_aspect_v2" DEFAULT_KAFKA_TOPIC_NAME = "MetadataChangeLog_Timeseries_v1" -DEFAULT_MYSQL_BATCH_SIZE = 10_000 +DEFAULT_DATABASE_BATCH_SIZE = 10_000 class DataHubSourceConfig(StatefulIngestionConfigBase): - mysql_connection: MySQLConnectionConfig = Field( - default=MySQLConnectionConfig(), - description="MySQL connection config", + database_connection: Optional[SQLAlchemyConnectionConfig] = Field( + default=None, + description="Database connection config", ) - kafka_connection: KafkaConsumerConnectionConfig = Field( - default=KafkaConsumerConnectionConfig(), + kafka_connection: Optional[KafkaConsumerConnectionConfig] = Field( + default=None, description="Kafka connection config", ) @@ -29,18 +30,19 @@ class DataHubSourceConfig(StatefulIngestionConfigBase): default=False, description=( "If enabled, include all versions of each aspect. " - "Otherwise, only include the latest version of each aspect." + "Otherwise, only include the latest version of each aspect. " + "If only the latest version is included, " ), ) - mysql_batch_size: int = Field( - default=DEFAULT_MYSQL_BATCH_SIZE, - description="Number of records to fetch from MySQL at a time", + database_query_batch_size: int = Field( + default=DEFAULT_DATABASE_BATCH_SIZE, + description="Number of records to fetch from the database at a time", ) - mysql_table_name: str = Field( - default=DEFAULT_MYSQL_TABLE_NAME, - description="Name of MySQL table containing all versioned aspects", + database_table_name: str = Field( + default=DEFAULT_DATABASE_TABLE_NAME, + description="Name of database table containing all versioned aspects", ) kafka_topic_name: str = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_mysql_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py similarity index 67% rename from metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_mysql_reader.py rename to metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py index adf4c1db57395..39702ba3ce347 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_mysql_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py @@ -10,33 +10,42 @@ from datahub.emitter.serialization_helper import post_json_transform from datahub.ingestion.source.datahub.config import DataHubSourceConfig from datahub.ingestion.source.datahub.report import DataHubSourceReport +from datahub.ingestion.source.sql.sql_config import SQLAlchemyConnectionConfig from datahub.metadata.schema_classes import ChangeTypeClass, SystemMetadataClass from datahub.utilities.lossy_collections import LossyDict, LossyList logger = logging.getLogger(__name__) -MYSQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" +# Should work for at least mysql, mariadb, postgres +DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" -class DataHubMySQLReader: - def __init__(self, config: DataHubSourceConfig, report: DataHubSourceReport): +class DataHubDatabaseReader: + def __init__( + self, + config: DataHubSourceConfig, + connection_config: SQLAlchemyConnectionConfig, + report: DataHubSourceReport, + ): self.config = config self.report = report self.engine = create_engine( - url=config.mysql_connection.get_sql_alchemy_url(), - **config.mysql_connection.options, + url=connection_config.get_sql_alchemy_url(), + **connection_config.options, ) @property def query(self) -> str: # May repeat rows for the same date - # Offset is generally 0, unless we repeat the same date twice + # Offset is generally 0, unless we repeat the same createdon twice return f""" SELECT urn, aspect, metadata, systemmetadata, createdon - FROM `{self.config.mysql_table_name}` + FROM `{self.config.database_table_name}` WHERE createdon >= %(since_createdon)s {"" if self.config.include_all_versions else "AND version = 0"} - ORDER BY createdon, urn, aspect, version # Ensures stable ordering + ORDER BY createdon, urn, aspect, # Ensure stable order, chronological per (urn, aspect) + CASE WHEN version = 0 THEN 1 ELSE 0 END, version + # Version 0 last, only when createdon is the same. Otherwise relies on createdon order LIMIT %(limit)s OFFSET %(offset)s """ @@ -48,11 +57,11 @@ def get_aspects( ts = from_createdon offset = 0 while ts.timestamp() <= stop_time.timestamp(): - logger.debug(f"Polling MySQL aspects from {ts}") + logger.debug(f"Polling database aspects from {ts}") rows = conn.execute( self.query, - since_createdon=ts.strftime(MYSQL_DATETIME_FORMAT), - limit=self.config.mysql_batch_size, + since_createdon=ts.strftime(DATETIME_FORMAT), + limit=self.config.database_query_batch_size, offset=offset, ) if not rows.rowcount: @@ -64,7 +73,7 @@ def get_aspects( row_dict = row._asdict() else: row_dict = dict(row) - mcp = self._parse_mysql_row(row_dict) + mcp = self._parse_row(row_dict) if mcp: yield mcp, row_dict["createdon"] @@ -72,15 +81,13 @@ def get_aspects( offset += i else: ts = row_dict["createdon"] - print(ts) offset = 0 - def _parse_mysql_row(self, d: Dict) -> Optional[MetadataChangeProposalWrapper]: + def _parse_row(self, d: Dict) -> Optional[MetadataChangeProposalWrapper]: try: json_aspect = post_json_transform(json.loads(d["metadata"])) json_metadata = post_json_transform(json.loads(d["systemmetadata"] or "{}")) system_metadata = SystemMetadataClass.from_obj(json_metadata) - system_metadata.lastObserved = int(d["createdon"].timestamp() * 1000) return MetadataChangeProposalWrapper( entityUrn=d["urn"], aspect=ASPECT_MAP[d["aspect"]].from_obj(json_aspect), @@ -91,8 +98,8 @@ def _parse_mysql_row(self, d: Dict) -> Optional[MetadataChangeProposalWrapper]: logger.warning( f"Failed to parse metadata for {d['urn']}: {e}", exc_info=True ) - self.report.num_mysql_parse_errors += 1 - self.report.mysql_parse_errors.setdefault(str(e), LossyDict()).setdefault( - d["aspect"], LossyList() - ).append(d["urn"]) + self.report.num_database_parse_errors += 1 + self.report.database_parse_errors.setdefault( + str(e), LossyDict() + ).setdefault(d["aspect"], LossyList()).append(d["urn"]) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py index b165d70dd53b0..d9e53e87c2cea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py @@ -11,6 +11,7 @@ from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroDeserializer +from datahub.configuration.kafka import KafkaConsumerConnectionConfig from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.datahub.config import DataHubSourceConfig @@ -27,10 +28,12 @@ class DataHubKafkaReader(Closeable): def __init__( self, config: DataHubSourceConfig, + connection_config: KafkaConsumerConnectionConfig, report: DataHubSourceReport, ctx: PipelineContext, ): self.config = config + self.connection_config = connection_config self.report = report self.group_id = f"{KAFKA_GROUP_PREFIX}-{ctx.pipeline_name}" @@ -38,13 +41,13 @@ def __enter__(self) -> "DataHubKafkaReader": self.consumer = DeserializingConsumer( { "group.id": self.group_id, - "bootstrap.servers": self.config.kafka_connection.bootstrap, - **self.config.kafka_connection.consumer_config, + "bootstrap.servers": self.connection_config.bootstrap, + **self.connection_config.consumer_config, "auto.offset.reset": "earliest", "enable.auto.commit": False, "value.deserializer": AvroDeserializer( schema_registry_client=SchemaRegistryClient( - {"url": self.config.kafka_connection.schema_registry_url} + {"url": self.connection_config.schema_registry_url} ), return_record_name=True, ), diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index 636e65a244dad..61207712341fe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -1,5 +1,6 @@ import logging from datetime import datetime, timezone +from functools import partial from typing import Dict, Iterable, List, Optional from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -11,10 +12,13 @@ support_status, ) from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport +from datahub.ingestion.api.source_helpers import auto_workunit_reporter from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.datahub.config import DataHubSourceConfig +from datahub.ingestion.source.datahub.datahub_database_reader import ( + DataHubDatabaseReader, +) from datahub.ingestion.source.datahub.datahub_kafka_reader import DataHubKafkaReader -from datahub.ingestion.source.datahub.datahub_mysql_reader import DataHubMySQLReader from datahub.ingestion.source.datahub.report import DataHubSourceReport from datahub.ingestion.source.datahub.state import StatefulDataHubIngestionHandler from datahub.ingestion.source.state.stateful_ingestion_base import ( @@ -46,30 +50,50 @@ def get_report(self) -> SourceReport: return self.report def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: - return [] # Exactly replicate data from DataHub source + # Exactly replicate data from DataHub source + return [partial(auto_workunit_reporter, self.get_report())] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - stop_time = datetime.now(tz=timezone.utc) - logger.info(f"Ingesting DataHub metadata up until roughly {stop_time}") + self.report.stop_time = datetime.now(tz=timezone.utc) + logger.info(f"Ingesting DataHub metadata up until {self.report.stop_time}") state = self.stateful_ingestion_handler.get_last_run_state() - yield from self._get_mysql_workunits(state.mysql_createdon_datetime, stop_time) - self._commit_progress() - yield from self._get_kafka_workunits(state.kafka_offsets, stop_time) - self._commit_progress() - def _get_mysql_workunits( - self, from_createdon: datetime, stop_time: datetime + if self.config.database_connection is not None: + yield from self._get_database_workunits( + from_createdon=state.database_createdon_datetime + ) + self._commit_progress() + else: + logger.info( + "Skipping ingestion of versioned aspects as no database_connection provided" + ) + + if self.config.kafka_connection is not None: + yield from self._get_kafka_workunits(from_offsets=state.kafka_offsets) + self._commit_progress() + else: + logger.info( + "Skipping ingestion of timeseries aspects as no kafka_connection provided" + ) + + def _get_database_workunits( + self, from_createdon: datetime ) -> Iterable[MetadataWorkUnit]: - logger.info(f"Fetching MySQL aspects from {from_createdon}") - reader = DataHubMySQLReader(self.config, self.report) - mcps = reader.get_aspects(from_createdon, stop_time) + if self.config.database_connection is None: + return + + logger.info(f"Fetching database aspects starting from {from_createdon}") + reader = DataHubDatabaseReader( + self.config, self.config.database_connection, self.report + ) + mcps = reader.get_aspects(from_createdon, self.report.stop_time) for i, (mcp, createdon) in enumerate(mcps): yield mcp.as_workunit() - self.report.num_mysql_aspects_ingested += 1 + self.report.num_database_aspects_ingested += 1 if ( self.config.commit_with_parse_errors - or not self.report.num_mysql_parse_errors + or not self.report.num_database_parse_errors ): self.stateful_ingestion_handler.update_checkpoint( last_createdon=createdon @@ -77,12 +101,18 @@ def _get_mysql_workunits( self._commit_progress(i) def _get_kafka_workunits( - self, from_offsets: Dict[int, int], stop_time: datetime + self, from_offsets: Dict[int, int] ) -> Iterable[MetadataWorkUnit]: - logger.info(f"Fetching timeseries aspects from kafka until {stop_time}") - - with DataHubKafkaReader(self.config, self.report, self.ctx) as reader: - mcls = reader.get_mcls(from_offsets=from_offsets, stop_time=stop_time) + if self.config.kafka_connection is None: + return + + logger.info(f"Fetching timeseries aspects from kafka") + with DataHubKafkaReader( + self.config, self.config.kafka_connection, self.report, self.ctx + ) as reader: + mcls = reader.get_mcls( + from_offsets=from_offsets, stop_time=self.report.stop_time + ) for i, (mcl, offset) in enumerate(mcls): mcp = MetadataChangeProposalWrapper.try_from_mcl(mcl) if mcp.changeType == ChangeTypeClass.DELETE: diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py index 3aa93d6a4577b..bd0a0aedaccd3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py @@ -1,3 +1,5 @@ +from datetime import timezone, datetime + from dataclasses import dataclass, field from datahub.ingestion.source.state.stateful_ingestion_base import ( @@ -8,10 +10,12 @@ @dataclass class DataHubSourceReport(StatefulIngestionReport): - num_mysql_aspects_ingested: int = 0 - num_mysql_parse_errors: int = 0 + stop_time: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc)) + + num_database_aspects_ingested: int = 0 + num_database_parse_errors: int = 0 # error -> aspect -> [urn] - mysql_parse_errors: LossyDict[str, LossyDict[str, LossyList[str]]] = field( + database_parse_errors: LossyDict[str, LossyDict[str, LossyList[str]]] = field( default_factory=LossyDict ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/state.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/state.py index deea9772fae20..4bedd331a9aea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/state.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/state.py @@ -16,14 +16,16 @@ class DataHubIngestionState(CheckpointStateBase): - mysql_createdon_ts: NonNegativeInt = 0 + database_createdon_ts: NonNegativeInt = 0 # Maps partition -> offset kafka_offsets: Dict[int, NonNegativeInt] = Field(default_factory=dict) @property - def mysql_createdon_datetime(self) -> datetime: - return datetime.fromtimestamp(self.mysql_createdon_ts / 1000, tz=timezone.utc) + def database_createdon_datetime(self) -> datetime: + return datetime.fromtimestamp( + self.database_createdon_ts / 1000, tz=timezone.utc + ) class PartitionOffset(NamedTuple): @@ -81,7 +83,7 @@ def update_checkpoint( if cur_checkpoint: cur_state = cast(DataHubIngestionState, cur_checkpoint.state) if last_createdon: - cur_state.mysql_createdon_ts = int(last_createdon.timestamp() * 1000) + cur_state.database_createdon_ts = int(last_createdon.timestamp() * 1000) if last_offset: cur_state.kafka_offsets[last_offset.partition] = last_offset.offset + 1 diff --git a/metadata-ingestion/src/datahub/testing/compare_metadata_json.py b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py index b017afc8c1448..5c52e1ab4f0b3 100644 --- a/metadata-ingestion/src/datahub/testing/compare_metadata_json.py +++ b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py @@ -55,7 +55,6 @@ def assert_metadata_files_equal( output = load_json_file(output_path) if update_golden and not golden_exists: - golden = load_json_file(output_path) shutil.copyfile(str(output_path), str(golden_path)) return else: diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py index 51e2b0795819a..f2ade22b7bfa6 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_checkpoint.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timezone from typing import Dict, List import pydantic @@ -27,7 +27,7 @@ def _assert_checkpoint_deserialization( ) -> Checkpoint: # Serialize a checkpoint aspect with the previous state. checkpoint_aspect = DatahubIngestionCheckpointClass( - timestampMillis=int(datetime.now().timestamp() * 1000), + timestampMillis=int(datetime.now(tz=timezone.utc).timestamp() * 1000), pipelineName=test_pipeline_name, platformInstanceId="this-can-be-anything-and-will-be-ignored", config="this-is-also-ignored",