Skip to content

Commit

Permalink
See open-metadata/OpenMetadata@ca9a92e from refs/heads/main
Browse files Browse the repository at this point in the history
  • Loading branch information
open-metadata committed Nov 3, 2023
1 parent f1f59a0 commit 913daa5
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 258 deletions.
23 changes: 23 additions & 0 deletions content/partials/v1.2/deployment/upgrade/upgrade-prerequisites.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,29 @@ If you try to run your workflows externally and start noticing `ImportError`s, y
In 1.1.7 and below you could run the Usage Workflow as `metadata ingest -c <path to yaml>`. Now, the Usage Workflow
has its own command `metadata usage -c <path to yaml>`.

### Custom Connectors

In 1.2.0 we have reorganized the internals of our Workflow handling to centralize status & exception management. This
will simplify how you need to take care of status and exceptions on your Custom Connectors code, while helping developers
to make decisions on those errors that need to be shared in the Workflow.

{% note %}

If you want to take a look at an updated Custom Connector and its changes, you can review the demo [PR](https://github.com/open-metadata/openmetadata-demo/pull/34/files).

{% /note %}

Let's list the changes down:
1. You don't need to handle the `SourceStatus` anymore. The new basic Workflow class will take care of things for you. Therefore, this import
`from metadata.ingestion.api.source import SourceStatus` is deprecated.
2. The `Source` class is now imported from `from metadata.ingestion.api.steps import Source` (instead of `from metadata.ingestion.api.source import Source`)
3. We are now initializing the `OpenMetadata` object at the Workflow level (to share it better in each step). Therefore,
the source `__init__` method signature is now `def __init__(self, config: WorkflowSource, metadata: OpenMetadata):`. Make sure to store the `self.metadata` object
during the `__init__` and don't forget to call `super().__init__()`.
4. We are updating how the status & exception management happens in the connectors. Now each `yield` result is wrapped by
an `Either` (imported from `from metadata.ingestion.api.models import Either`). Your correct data will be `yield`ed in a `right`, while
the errors are tracked in a `left`. Read more about the Workflow management [here](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/workflow/README.md).

### Other Changes

- Pipeline Status are now timestamps in milliseconds.
4 changes: 2 additions & 2 deletions content/v1.2.x/deployment/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Are you exploring or doing a PoC? It won't get easier than following our Quickst
{% inlineCallout
icon="celebration"
bold="Quickstart OpenMetadata"
href="/quick-start/local-deployment" %}
href="/quick-start/local-docker-deployment" %}
Get OpenMetadata up and running in under 5 minutes!
{% /inlineCallout %}

Expand All @@ -34,7 +34,7 @@ We support different kinds of deployment:
color="violet-70"
icon="celebration"
bold="Local Docker Deployment"
href="/quick-start/local-deployment"%}
href="/quick-start/local-docker-deployment"%}
Get OpenMetadata up and running locally in under 7 minutes!
{%/inlineCallout%}
{%inlineCallout
Expand Down
28 changes: 5 additions & 23 deletions content/v1.2.x/sdk/python/build-connector/bulk-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,17 @@ slug: /sdk/python/build-connector/bulk-sink
## API

```python
@dataclass # type: ignore[misc]
class BulkSink(Closeable, metaclass=ABCMeta):
ctx: WorkflowContext
class BulkSink(BulkStep, ABC):
"""All Stages must inherit this base class."""

@classmethod
# From the parent - Adding here just to showcase
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "BulkSink":
pass

@abstractmethod
def write_records(self) -> None:
pass

@abstractmethod
def get_status(self) -> BulkSinkStatus:
pass

@abstractmethod
def close(self) -> None:
def run(self) -> None:
pass
```

**create** method is called during the workflow instantiation and creates an instance of the bulk sink.

**write_records** this method is called only once in Workflow. Its developer responsibility is to make bulk actions inside this method. Such as read the entire file or store to generate the API calls to external services.

**get_status** to report the status of the bulk_sink ex: how many records, failures or warnings etc.

**close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
**run** this method is called only once in Workflow. Its developer responsibility is to make bulk actions inside this method. Such as read the entire file or store to generate the API calls to external services.

## Example
[Example implementation](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py#L52)
33 changes: 33 additions & 0 deletions content/v1.2.x/sdk/python/build-connector/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Ingestion is a simple python framework to ingest the metadata from various sourc
Please look at our framework [APIs](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/src/metadata/ingestion/api).

## Workflow

[workflow](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/api/workflow.py) is a simple orchestration job that runs the components in an Order.

A workflow consists of [Source](/sdk/python/build-connector/source) and [Sink](/sdk/python/build-connector/sink). It also provides support for [Stage](/sdk/python/build-connector/stage) and [BulkSink](/sdk/python/build-connector/bulk-sink).
Expand All @@ -26,6 +27,36 @@ Workflow execution happens in a serial fashion.

In the cases where we need aggregation over the records, we can use the **stage** to write to a file or other store. Use the file written to in **stage** and pass it to **bulk sink** to publish to external services such as **OpenMetadata** or **Elasticsearch**.

Each `Step` comes from this generic definition:

```python
class Step(ABC, Closeable):
"""All Workflow steps must inherit this base class."""

status: Status

def __init__(self):
self.status = Status()

@classmethod
@abstractmethod
def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step":
pass

def get_status(self) -> Status:
return self.status

@abstractmethod
def close(self) -> None:
pass
```

so we always need to inform the methods:
- `create` to initialize the actual step.
- `close` in case there's any connection that needs to be terminated.

On top of this, you can find further notes on each specific step in the links below:

{% inlineCalloutContainer %}
{% inlineCallout
color="violet-70"
Expand Down Expand Up @@ -56,3 +87,5 @@ In the cases where we need aggregation over the records, we can use the **stage*
It can be used to bulk update the records generated in a workflow.
{% /inlineCallout %}
{% /inlineCalloutContainer %}

Read more about the Workflow management [here](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/workflow/README.md).
87 changes: 9 additions & 78 deletions content/v1.2.x/sdk/python/build-connector/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,87 +9,18 @@ The **Sink** will get the event emitted by the source, one at a time. It can use
## API

```python
@dataclass # type: ignore[misc]
class Sink(Closeable, metaclass=ABCMeta):
"""All Sinks must inherit this base class."""

ctx: WorkflowContext

@classmethod
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
pass

@abstractmethod
def write_record(self, record: Record) -> None:
# must call callback when done.
pass
class Sink(ReturnStep, ABC):
"""All Sinks must inherit this base class."""

# From the parent - Just to showcase
@abstractmethod
def get_status(self) -> SinkStatus:
pass

@abstractmethod
def close(self) -> None:
pass
def _run(self, record: Entity) -> Either:
"""
Main entrypoint to execute the step
"""
```

**create** method is called during the workflow instantiation and creates an instance of the sink.

**write_record** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.

**get_status** to report the status of the sink ex: how many records, failures or warnings etc.

**close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
**_run** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.

## Example
Example implementation

```python
class MetadataRestTablesSink(Sink):
config: MetadataTablesSinkConfig
status: SinkStatus

def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.status = SinkStatus()
self.wrote_something = False
self.rest = REST(self.metadata_config)

@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
config = MetadataTablesSinkConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)

def write_record(self, entity_request) -> None:
log = f"{type(entity_request).__name__} [{entity_request.name.__root__}]"
try:
created = self.metadata.create_or_update(entity_request)
if created:
self.status.records_written(
f"{type(created).__name__}: {created.fullyQualifiedName.__root__}"
)
logger.debug(f"Successfully ingested {log}")
else:
self.status.failure(log)
logger.error(f"Failed to ingest {log}")

except (APIError, HTTPError) as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to ingest {log} due to api request failure: {err}")
self.status.failure(log)

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to ingest {log}: {exc}")
self.status.failure(log)

def get_status(self):
return self.status

def close(self):
pass
```
[Example implementation](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/sink/metadata_rest.py#L87)
111 changes: 18 additions & 93 deletions content/v1.2.x/sdk/python/build-connector/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,115 +10,40 @@ The **Source** is the connector to external systems and outputs a record for dow
## Source API

```python
@dataclass # type: ignore[misc]
class Source(Closeable, metaclass=ABCMeta):
ctx: WorkflowContext
@classmethod
class Source(IterStep, ABC):
"""
Abstract source implementation. The workflow will run
its next_record and pass them to the next step.
"""

metadata: OpenMetadata
connection_obj: Any
service_connection: Any

# From the parent - Adding here just to showcase
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Source":
pass
def _iter(self) -> Iterable[Either]:
"""Main entrypoint to run through the Iterator"""

@abstractmethod
def prepare(self):
pass

@abstractmethod
def next_record(self) -> Iterable[Record]:
pass

@abstractmethod
def get_status(self) -> SourceStatus:
def test_connection(self) -> None:
pass
```

**create** method is used to create an instance of Source.

**prepare** will be called through Python's init method. This will be a place where you could make connections to external sources or initiate the client library.

**next_record** is where the client can connect to an external resource and emit the data downstream.

**get_status** is for the [workflow](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/api/workflow.py) to call and report the status of the source such as how many records its processed any failures or warnings.
**_iter** is where the client can connect to an external resource and emit the data downstream.

## Example
**test_connection** is used (by OpenMetadata supported connectors ONLY) to validate permissions and connectivity before moving forward with the ingestion.

A simple example of this implementation is

```python
class SampleTablesSource(Source):

def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleTableSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.client = REST(metadata_config)
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
self.database = json.load(open(config.sample_schema_folder + "/database.json", 'r'))
self.tables = json.load(open(config.sample_schema_folder + "/tables.json", 'r'))
self.service = get_service_or_create(self.service_json, metadata_config)

@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleTableSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)

def prepare(self):
pass

def next_record(self) -> Iterable[Entity]:

yield from self.yield_create_request_database_service(self.config)

service_entity: DatabaseService = self.metadata.get_by_name(
entity=DatabaseService, fqn=self.config.serviceName
)

yield CreateDatabaseRequest(
name="awesome-database",
service=service_entity.fullyQualifiedName,
)

database_entity: Database = self.metadata.get_by_name(
entity=Database, fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service.name.__root__,
database_name="awesome-database",
)
)

yield CreateDatabaseSchemaRequest(
name="awesome-schema",
description="description",
database=database_entity.fullyQualifiedName,
)

database_schema_entity: DatabaseSchema = self.metadata.get_by_name(
entity=DatabaseSchema, fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service.name.__root__,
database_name="awesome-database",
schema_name="awesome-schema"
)
)

yield CreateTableRequest(
name="awesome-table",
description="description",
columns="columns",
databaseSchema=database_schema_entity.fullyQualifiedName,
tableConstraints=table.get("tableConstraints"),
tableType=table["tableType"],
)

def close(self):
pass
## Example

def get_status(self):
return self.status
```
A simple example of this implementation can be found in our demo Custom Connector [here](https://github.com/open-metadata/openmetadata-demo/blob/main/custom-connector/connector/my_csv_connector.py)

## For Consumers of Openmetadata-ingestion to define custom connectors in their own package with same namespace

Expand Down
Loading

0 comments on commit 913daa5

Please sign in to comment.