Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): file-based state checkpoint provider #9029

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@
],
"datahub.ingestion.checkpointing_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider",
"file = datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider:FileIngestionCheckpointingProvider",
],
"datahub.ingestion.reporting_provider.plugins": [
"datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
@classmethod
@abstractmethod
def create(
cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext, name: str
cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext
) -> "_Self":
pass

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def get_latest_pipeline_checkpoint(
DatahubIngestionCheckpointingProvider,
)

checkpoint_provider = DatahubIngestionCheckpointingProvider(self, "graph")
checkpoint_provider = DatahubIngestionCheckpointingProvider(self)
job_name = StaleEntityRemovalHandler.compute_job_id(platform)

raw_checkpoint = checkpoint_provider.get_latest_checkpoint(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from typing import Any, Dict, Generic, Optional, Type, TypeVar, cast
from typing import Any, Dict, Generic, Optional, Type, TypeVar

import pydantic
from pydantic import root_validator
Expand Down Expand Up @@ -38,10 +38,8 @@ class DynamicTypedStateProviderConfig(DynamicTypedConfig):
type: str = Field(
description="The type of the state provider to use. For DataHub use `datahub`",
)
# This config type is declared Optional[Any] here. The eventual parser for the
# specified type is responsible for further validation.
config: Optional[Any] = Field(
default=None,
config: Dict[str, Any] = Field(
default={},
description="The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
)

Expand Down Expand Up @@ -81,7 +79,7 @@ def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if values.get("enabled"):
if values.get("state_provider") is None:
values["state_provider"] = DynamicTypedStateProviderConfig(
type="datahub", config=None
type="datahub", config={}
)
return values

Expand Down Expand Up @@ -246,15 +244,10 @@ def _initialize_checkpointing_state_provider(self) -> None:
f"Cannot find checkpoint provider class of type={self.stateful_ingestion_config.state_provider.type} "
" in the registry! Please check the type of the checkpointing provider in your config."
)
config_dict: Dict[str, Any] = cast(
Dict[str, Any],
self.stateful_ingestion_config.state_provider.dict().get("config", {}),
)
self.ingestion_checkpointing_state_provider = (
checkpointing_state_provider_class.create(
config_dict=config_dict,
config_dict=self.stateful_ingestion_config.state_provider.config,
ctx=self.ctx,
name=checkpointing_state_provider_class.__name__,
)
)
assert self.ingestion_checkpointing_state_provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@


class DatahubIngestionStateProviderConfig(IngestionCheckpointingProviderConfig):
datahub_api: Optional[DatahubClientConfig] = DatahubClientConfig()
datahub_api: DatahubClientConfig = DatahubClientConfig()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should remain optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no use of making it optional. It will just add the extra useless if condition of checking datahub_api because of lint error.
Still if it needs to remain optional let me know.



class DatahubIngestionCheckpointingProvider(IngestionCheckpointingProviderBase):
orchestrator_name: str = "datahub"

def __init__(self, graph: DataHubGraph, name: str):
super().__init__(name)
def __init__(
self,
graph: DataHubGraph,
):
super().__init__(self.__class__.__name__)
self.graph = graph
if not self._is_server_stateful_ingestion_capable():
raise ConfigurationError(
Expand All @@ -34,24 +37,14 @@ def __init__(self, graph: DataHubGraph, name: str):

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
cls, config_dict: Dict[str, Any], ctx: PipelineContext
) -> "DatahubIngestionCheckpointingProvider":
config = DatahubIngestionStateProviderConfig.parse_obj(config_dict)
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
if ctx.graph:
# Use the pipeline-level graph if set
return cls(ctx.graph, name)
elif config_dict is None:
raise ConfigurationError("Missing provider configuration.")
return cls(ctx.graph)
else:
provider_config = (
DatahubIngestionStateProviderConfig.parse_obj_allow_extras(config_dict)
)
if provider_config.datahub_api:
graph = DataHubGraph(provider_config.datahub_api)
return cls(graph, name)
else:
raise ConfigurationError(
"Missing datahub_api. Provide either a global one or under the state_provider."
)
return cls(DataHubGraph(config.datahub_api))

def _is_server_stateful_ingestion_capable(self) -> bool:
server_config = self.graph.get_config() if self.graph else None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging
import pathlib
from datetime import datetime
from typing import Any, Dict, List, Optional

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
IngestionCheckpointingProviderBase,
IngestionCheckpointingProviderConfig,
JobId,
)
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.file import read_metadata_file
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass

logger = logging.getLogger(__name__)


class FileIngestionStateProviderConfig(IngestionCheckpointingProviderConfig):
filename: str


class FileIngestionCheckpointingProvider(IngestionCheckpointingProviderBase):
orchestrator_name: str = "file"

def __init__(self, config: FileIngestionStateProviderConfig):
super().__init__(self.__class__.__name__)
self.config = config

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext
) -> "FileIngestionCheckpointingProvider":
config = FileIngestionStateProviderConfig.parse_obj(config_dict)
return cls(config)

def get_latest_checkpoint(
self,
pipeline_name: str,
job_name: JobId,
) -> Optional[DatahubIngestionCheckpointClass]:
logger.debug(
f"Querying for the latest ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}'"
)

data_job_urn = self.get_data_job_urn(
self.orchestrator_name, pipeline_name, job_name
)
latest_checkpoint: Optional[DatahubIngestionCheckpointClass] = None
try:
for obj in read_metadata_file(pathlib.Path(self.config.filename)):
if (
isinstance(obj, MetadataChangeProposalWrapper)
and obj.entityUrn == data_job_urn
and obj.aspect
and isinstance(obj.aspect, DatahubIngestionCheckpointClass)
and obj.aspect.get("pipelineName", "") == pipeline_name
):
latest_checkpoint = obj.aspect
break
except FileNotFoundError:
logger.debug(f"File {self.config.filename} not found")

if latest_checkpoint:
logger.debug(
f"The last committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}' found with start_time:"
f" {datetime.utcfromtimestamp(latest_checkpoint.timestampMillis/1000)}"
)
return latest_checkpoint
else:
logger.debug(
f"No committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}' found"
)

return None

def commit(self) -> None:
if not self.state_to_commit:
logger.warning(f"No state available to commit for {self.name}")
return None

checkpoint_workunits: List[MetadataChangeProposalWrapper] = []
for job_name, checkpoint in self.state_to_commit.items():
# Emit the ingestion state for each job
logger.debug(
f"Committing ingestion checkpoint for pipeline:'{checkpoint.pipelineName}', "
f"job:'{job_name}'"
)
datajob_urn = self.get_data_job_urn(
self.orchestrator_name,
checkpoint.pipelineName,
job_name,
)
checkpoint_workunits.append(
MetadataChangeProposalWrapper(
entityUrn=datajob_urn,
aspect=checkpoint,
)
)
write_metadata_file(pathlib.Path(self.config.filename), checkpoint_workunits)
self.committed = True
logger.debug(
f"Committed all ingestion checkpoints for pipeline:'{checkpoint.pipelineName}'"
)
26 changes: 26 additions & 0 deletions metadata-ingestion/tests/integration/lookml/golden_test_state.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,lookml_stateful,prod),lookml_stale_entity_removal)",
"changeType": "UPSERT",
"aspectName": "datahubIngestionCheckpoint",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"pipelineName": "lookml_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "base85-bz2-json",
"payload": "LRx4!F+o`-Q(4)<4JiNuUmt)_WdINa0@Mn>@BivB0a-v1sF;Ar&}h&A0K-EjK*+=xnKU%Oib;?JVrrXB7?aRqCarWwpZm8v5Yh+DsN{|c*msMh9%WJXjKPvIPsDn^@g3;DD9Q9kBh?*|=8M4uRW$_0HKn3XhN;RhAcLIBhLnO2%UA@Ykl;h&Xx(^@2;Y9C#d4g3K_2CA-I*M)h{NMA8Nu4C3XjEQYdh{nR--&lfRUsTL}OOkOO435f=1nKzYJ^9)mbBljM0}gaqy26URw1=q<80Eb9y)y?Vl88kG;g~MToq#r%6tr<yx^i_E#v)8~7vUJum>K9U`U?k}RS<@^?i@<c?y}RaZG9JGf09m`0f!sz%!^wDYcoJR{ix%d2rWCL+XvG>1M1@9*%tk}1N3hRzUaNB"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually I'd like to change this so that the compression is tied to the state provider, not the checkpoint object - but we can save that for a future PR

},
"runId": "lookml-test"
}
}
}
]
Loading
Loading