Skip to content

Commit

Permalink
feat(ingest/datahub): Improvements, bug fixes, and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz committed Aug 28, 2023
1 parent 04ecf4f commit b76f2ab
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 66 deletions.
4 changes: 4 additions & 0 deletions metadata-ingestion/docs/sources/datahub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Migrate data from one DataHub instance to another.

Requires direct access to the database, kafka broker, and kafka schema registry
of the source DataHub instance.
66 changes: 66 additions & 0 deletions metadata-ingestion/docs/sources/datahub/datahub_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
### Overview

This source pulls data from two locations:
- The DataHub database, containing a single table holding all versioned aspects
- The DataHub Kafka cluster, reading from the [MCL Log](../../../../docs/what/mxe.md#metadata-change-log-mcl)
topic for timeseries aspects.

All data is first read from the database, before timeseries data is ingested from kafka.
To prevent this source from potentially running forever, it will not ingest data produced after the
datahub_source ingestion job is started. This `stop_time` is reflected in the report.

Data from the database and kafka are read in chronological order, specifically by the
createdon timestamp in the database and by kafka offset per partition. In order to
properly read from the database, please ensure that the `createdon` column is indexed.
Newly created databases should have this index, named `timeIndex`, by default, but older
ones you may have to create yourself, with the statement:

```
CREATE INDEX timeIndex ON metadata_aspect_v2 (createdon);
```

**If you do not have this index, the source may run incredibly slowly and produce
significant database load.**

#### Stateful Ingestion
On first run, the source will read from the earliest data in the database and the earliest
kafka offsets. Every `commit_state_interval` (default 1000) records, the source will store
a checkpoint to remember its place, i.e. the last createdon timestamp and kafka offsets.
This allows you to stop and restart the source without losing much progress, but note that
you will re-ingest some data at the start of the new run.

If any errors are encountered in the ingestion process, e.g. we are unable to emit an aspect
due to network errors, the source will keep running, but will stop committing checkpoints,
unless `commit_with_parse_errors` (default `false) is set. Thus, if you re-run the ingestion,
you can re-ingest the data that was missed, but note it will all re-ingest all subsequent data.

If you want to re-ingest all data, you can set a different `pipeline_name` in your recipe,
or set `stateful_ingestion.ignore_old_state`:

```yaml
source:
config:
# ... connection config, etc.
stateful_ingestion:
enabled: true
ignore_old_state: true
```
#### Limitations
- Can only pull timeseries aspects retained by Kafka, which by default lasts 90 days.
- Does not detect hard timeseries deletions, e.g. if via a `datahub delete` command using the CLI.
Therefore, if you deleted data in this way, it will still exist in the destination instance.
- If you have a significant amount of aspects with the exact same `createdon` timestamp,
stateful ingestion will not be able to save checkpoints partially through that timestamp.
On a subsequent run, all aspects for that timestamp will be ingested.

#### Performance
On your destination DataHub instance, we suggest the following settings:
- Enable [async ingestion](../../../../docs/deploy/environment-vars.md#ingestion)
- Use standalone consumers
([mae-consumer](../../../../metadata-jobs/mae-consumer-job/README.md)
and [mce-consumer](../../../../metadata-jobs/mce-consumer-job/README.md))
* If you are migrating large amounts of data, consider scaling consumer replicas.
- Increase the number of gms pods to add redundancy and increase resilience to node evictions
* If you are migrating large amounts of data, consider increasing elasticsearch's
thread count via the `ELASTICSEARCH_THREAD_COUNT` environment variable.
23 changes: 23 additions & 0 deletions metadata-ingestion/docs/sources/datahub/datahub_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pipeline_name: datahub_source_1
datahub_api:
server: "http://localhost:8080" # Migrate data from DataHub instance on localhost:8080
token: "<token>"
source:
type: datahub
config:
include_all_versions: false
database_connection:
scheme: "mysql+pymysql" # or "postgresql+psycopg2" for Postgres
host_port: "<database_host>:<database_port>"
username: "<username>"
password: "<password>"
database: "<database>"
kafka_connection:
bootstrap: "<boostrap_url>:9092"
schema_registry_url: "<schema_registry_url>:8081"
stateful_ingestion:
enabled: true
ignore_old_state: false
extractor_config:
set_system_metadata: false # Replicate system metadata
# sink config if necessary, e.g. to the output DataHub instance
30 changes: 16 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/datahub/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,45 @@

from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.ingestion.source.sql.mysql import MySQLConnectionConfig
from datahub.ingestion.source.sql.sql_config import SQLAlchemyConnectionConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StatefulIngestionConfigBase,
)

DEFAULT_MYSQL_TABLE_NAME = "metadata_aspect_v2"
DEFAULT_DATABASE_TABLE_NAME = "metadata_aspect_v2"
DEFAULT_KAFKA_TOPIC_NAME = "MetadataChangeLog_Timeseries_v1"
DEFAULT_MYSQL_BATCH_SIZE = 10_000
DEFAULT_DATABASE_BATCH_SIZE = 10_000


class DataHubSourceConfig(StatefulIngestionConfigBase):
mysql_connection: MySQLConnectionConfig = Field(
default=MySQLConnectionConfig(),
description="MySQL connection config",
database_connection: Optional[SQLAlchemyConnectionConfig] = Field(
default=None,
description="Database connection config",
)

kafka_connection: KafkaConsumerConnectionConfig = Field(
default=KafkaConsumerConnectionConfig(),
kafka_connection: Optional[KafkaConsumerConnectionConfig] = Field(
default=None,
description="Kafka connection config",
)

include_all_versions: bool = Field(
default=False,
description=(
"If enabled, include all versions of each aspect. "
"Otherwise, only include the latest version of each aspect."
"Otherwise, only include the latest version of each aspect. "
"If only the latest version is included, "
),
)

mysql_batch_size: int = Field(
default=DEFAULT_MYSQL_BATCH_SIZE,
description="Number of records to fetch from MySQL at a time",
database_query_batch_size: int = Field(
default=DEFAULT_DATABASE_BATCH_SIZE,
description="Number of records to fetch from the database at a time",
)

mysql_table_name: str = Field(
default=DEFAULT_MYSQL_TABLE_NAME,
description="Name of MySQL table containing all versioned aspects",
database_table_name: str = Field(
default=DEFAULT_DATABASE_TABLE_NAME,
description="Name of database table containing all versioned aspects",
)

kafka_topic_name: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,42 @@
from datahub.emitter.serialization_helper import post_json_transform
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.report import DataHubSourceReport
from datahub.ingestion.source.sql.sql_config import SQLAlchemyConnectionConfig
from datahub.metadata.schema_classes import ChangeTypeClass, SystemMetadataClass
from datahub.utilities.lossy_collections import LossyDict, LossyList

logger = logging.getLogger(__name__)

MYSQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
# Should work for at least mysql, mariadb, postgres
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"


class DataHubMySQLReader:
def __init__(self, config: DataHubSourceConfig, report: DataHubSourceReport):
class DataHubDatabaseReader:
def __init__(
self,
config: DataHubSourceConfig,
connection_config: SQLAlchemyConnectionConfig,
report: DataHubSourceReport,
):
self.config = config
self.report = report
self.engine = create_engine(
url=config.mysql_connection.get_sql_alchemy_url(),
**config.mysql_connection.options,
url=connection_config.get_sql_alchemy_url(),
**connection_config.options,
)

@property
def query(self) -> str:
# May repeat rows for the same date
# Offset is generally 0, unless we repeat the same date twice
# Offset is generally 0, unless we repeat the same createdon twice
return f"""
SELECT urn, aspect, metadata, systemmetadata, createdon
FROM `{self.config.mysql_table_name}`
FROM `{self.config.database_table_name}`
WHERE createdon >= %(since_createdon)s
{"" if self.config.include_all_versions else "AND version = 0"}
ORDER BY createdon, urn, aspect, version # Ensures stable ordering
ORDER BY createdon, urn, aspect, # Ensure stable order, chronological per (urn, aspect)
CASE WHEN version = 0 THEN 1 ELSE 0 END, version
# Version 0 last, only when createdon is the same. Otherwise relies on createdon order
LIMIT %(limit)s
OFFSET %(offset)s
"""
Expand All @@ -48,11 +57,11 @@ def get_aspects(
ts = from_createdon
offset = 0
while ts.timestamp() <= stop_time.timestamp():
logger.debug(f"Polling MySQL aspects from {ts}")
logger.debug(f"Polling database aspects from {ts}")
rows = conn.execute(
self.query,
since_createdon=ts.strftime(MYSQL_DATETIME_FORMAT),
limit=self.config.mysql_batch_size,
since_createdon=ts.strftime(DATETIME_FORMAT),
limit=self.config.database_query_batch_size,
offset=offset,
)
if not rows.rowcount:
Expand All @@ -64,23 +73,21 @@ def get_aspects(
row_dict = row._asdict()
else:
row_dict = dict(row)
mcp = self._parse_mysql_row(row_dict)
mcp = self._parse_row(row_dict)
if mcp:
yield mcp, row_dict["createdon"]

if ts == row_dict["createdon"]:
offset += i
else:
ts = row_dict["createdon"]
print(ts)
offset = 0

def _parse_mysql_row(self, d: Dict) -> Optional[MetadataChangeProposalWrapper]:
def _parse_row(self, d: Dict) -> Optional[MetadataChangeProposalWrapper]:
try:
json_aspect = post_json_transform(json.loads(d["metadata"]))
json_metadata = post_json_transform(json.loads(d["systemmetadata"] or "{}"))
system_metadata = SystemMetadataClass.from_obj(json_metadata)
system_metadata.lastObserved = int(d["createdon"].timestamp() * 1000)
return MetadataChangeProposalWrapper(
entityUrn=d["urn"],
aspect=ASPECT_MAP[d["aspect"]].from_obj(json_aspect),
Expand All @@ -91,8 +98,8 @@ def _parse_mysql_row(self, d: Dict) -> Optional[MetadataChangeProposalWrapper]:
logger.warning(
f"Failed to parse metadata for {d['urn']}: {e}", exc_info=True
)
self.report.num_mysql_parse_errors += 1
self.report.mysql_parse_errors.setdefault(str(e), LossyDict()).setdefault(
d["aspect"], LossyList()
).append(d["urn"])
self.report.num_database_parse_errors += 1
self.report.database_parse_errors.setdefault(
str(e), LossyDict()
).setdefault(d["aspect"], LossyList()).append(d["urn"])
return None
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
Expand All @@ -27,24 +28,26 @@ class DataHubKafkaReader(Closeable):
def __init__(
self,
config: DataHubSourceConfig,
connection_config: KafkaConsumerConnectionConfig,
report: DataHubSourceReport,
ctx: PipelineContext,
):
self.config = config
self.connection_config = connection_config
self.report = report
self.group_id = f"{KAFKA_GROUP_PREFIX}-{ctx.pipeline_name}"

def __enter__(self) -> "DataHubKafkaReader":
self.consumer = DeserializingConsumer(
{
"group.id": self.group_id,
"bootstrap.servers": self.config.kafka_connection.bootstrap,
**self.config.kafka_connection.consumer_config,
"bootstrap.servers": self.connection_config.bootstrap,
**self.connection_config.consumer_config,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"value.deserializer": AvroDeserializer(
schema_registry_client=SchemaRegistryClient(
{"url": self.config.kafka_connection.schema_registry_url}
{"url": self.connection_config.schema_registry_url}
),
return_record_name=True,
),
Expand Down
Loading

0 comments on commit b76f2ab

Please sign in to comment.