Skip to content

Commit

Permalink
feat(integration/fivetran): Fivetran connector integration (#9018)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
shubhamjagtap639 and hsheth2 authored Nov 8, 2023
1 parent 353584c commit e73e926
Show file tree
Hide file tree
Showing 20 changed files with 1,777 additions and 70 deletions.
4 changes: 4 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import databricksLogo from '../../../../images/databrickslogo.png';
import verticaLogo from '../../../../images/verticalogo.png';
import mlflowLogo from '../../../../images/mlflowlogo.png';
import dynamodbLogo from '../../../../images/dynamodblogo.png';
import fivetranLogo from '../../../../images/fivetranlogo.png';

export const ATHENA = 'athena';
export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`;
Expand Down Expand Up @@ -105,6 +106,8 @@ export const DBT_CLOUD = 'dbt-cloud';
export const DBT_CLOUD_URN = `urn:li:dataPlatform:dbt`;
export const VERTICA = 'vertica';
export const VERTICA_URN = `urn:li:dataPlatform:${VERTICA}`;
export const FIVETRAN = 'fivetran';
export const FIVETRAN_URN = `urn:li:dataPlatform:${FIVETRAN}`;

export const PLATFORM_URN_TO_LOGO = {
[ATHENA_URN]: athenaLogo,
Expand Down Expand Up @@ -138,6 +141,7 @@ export const PLATFORM_URN_TO_LOGO = {
[SUPERSET_URN]: supersetLogo,
[UNITY_CATALOG_URN]: databricksLogo,
[VERTICA_URN]: verticaLogo,
[FIVETRAN_URN]: fivetranLogo,
};

export const SOURCE_TO_PLATFORM_URN = {
Expand Down
7 changes: 7 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/vertica/",
"recipe": "source:\n type: vertica\n config:\n # Coordinates\n host_port: localhost:5433\n # The name of the vertica database\n database: Database_Name\n # Credentials\n username: Vertica_User\n password: Vertica_Password\n\n include_tables: true\n include_views: true\n include_projections: true\n include_models: true\n include_view_lineage: true\n include_projection_lineage: true\n profiling:\n enabled: false\n stateful_ingestion:\n enabled: true "
},
{
"urn": "urn:li:dataPlatform:fivetran",
"name": "fivetran",
"displayName": "Fivetran",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/fivetran/",
"recipe": "source:\n type: fivetran\n config:\n # Fivetran log connector destination server configurations\n fivetran_log_config:\n destination_platform: snowflake\n destination_config:\n # Coordinates\n account_id: snowflake_account_id\n warehouse: warehouse_name\n database: snowflake_db\n log_schema: fivetran_log_schema\n\n # Credentials\n username: ${SNOWFLAKE_USER}\n password: ${SNOWFLAKE_PASS}\n role: snowflake_role\n\n # Optional - filter for certain connector names instead of ingesting everything.\n # connector_patterns:\n # allow:\n # - connector_name\n\n # Optional -- This mapping is optional and only required to configure platform-instance for source\n # A mapping of Fivetran connector id to data platform instance\n # sources_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV\n\n # Optional -- This mapping is optional and only required to configure platform-instance for destination.\n # A mapping of Fivetran destination id to data platform instance\n # destination_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV"
},
{
"urn": "urn:li:dataPlatform:custom",
"name": "custom",
Expand Down
Binary file added datahub-web-react/src/images/fivetranlogo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
86 changes: 86 additions & 0 deletions metadata-ingestion/docs/sources/fivetran/fivetran_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
## Integration Details

This source extracts the following:

- Connectors in fivetran as Data Pipelines and Data Jobs to represent data lineage information between source and destination.
- Connector sources - DataJob input Datasets.
- Connector destination - DataJob output Datasets.
- Connector runs - DataProcessInstances as DataJob runs.

## Configuration Notes

1. Fivetran supports the fivetran platform connector to dump the log events and connectors, destinations, users and roles metadata in your destination.
2. You need to setup and start the initial sync of the fivetran platform connector before using this source. Refer [link](https://fivetran.com/docs/logs/fivetran-platform/setup-guide).
3. Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe.

## Concept mapping

| Fivetran | Datahub |
|--------------------------|--------------------------------------------------------------------------------------------------------|
| `Connector` | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) |
| `Source` | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) |
| `Destination` | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) |
| `Connector Run` | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |

Source and destination are mapped to Dataset as an Input and Output of Connector.

## Current limitations

Works only for Snowflake destination for now.

## Snowflake destination Configuration Guide
1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata.
2. Snowflake system admin can follow this guide to create a fivetran_datahub role, assign it the required privileges, and assign it to a user by executing the following Snowflake commands from a user with the ACCOUNTADMIN role or MANAGE GRANTS privilege.

```sql
create or replace role fivetran_datahub;

// Grant access to a warehouse to run queries to view metadata
grant operate, usage on warehouse "<your-warehouse>" to role fivetran_datahub;

// Grant access to view database and schema in which your log and metadata tables exist
grant usage on DATABASE "<fivetran-log-database>" to role fivetran_datahub;
grant usage on SCHEMA "<fivetran-log-database>"."<fivetran-log-schema>" to role fivetran_datahub;

// Grant access to execute select query on schema in which your log and metadata tables exist
grant select on all tables in SCHEMA "<fivetran-log-database>"."<fivetran-log-schema>" to role fivetran_datahub;

// Grant the fivetran_datahub to the snowflake user.
grant role fivetran_datahub to user snowflake_user;
```

## Advanced Configurations

### Working with Platform Instances
If you've multiple instances of source/destination systems that are referred in your `fivetran` setup, you'd need to configure platform instance for these systems in `fivetran` recipe to generate correct lineage edges. Refer the document [Working with Platform Instances](https://datahubproject.io/docs/platform-instances) to understand more about this.

While configuration of platform instance for source system you need to provide connector id as key and for destination system provide destination id as key.

#### Example - Multiple Postgres Source Connectors each reading from different postgres instance
```yml
# Map of connector source to platform instance
sources_to_platform_instance:
postgres_connector_id1:
platform_instance: cloud_postgres_instance
env: PROD

postgres_connector_id2:
platform_instance: local_postgres_instance
env: DEV
```
#### Example - Multiple Snowflake Destinations each writing to different snowflake instance
```yml
# Map of destination to platform instance
destination_to_platform_instance:
snowflake_destination_id1:
platform_instance: prod_snowflake_instance
env: PROD

snowflake_destination_id2:
platform_instance: dev_snowflake_instance
env: PROD
```
43 changes: 43 additions & 0 deletions metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
source:
type: fivetran
config:
# Fivetran log connector destination server configurations
fivetran_log_config:
destination_platform: snowflake
destination_config:
# Coordinates
account_id: "abc48144"
warehouse: "COMPUTE_WH"
database: "MY_SNOWFLAKE_DB"
log_schema: "FIVETRAN_LOG"

# Credentials
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASS}"
role: "snowflake_role"

# Optional - filter for certain connector names instead of ingesting everything.
# connector_patterns:
# allow:
# - connector_name

# Optional -- A mapping of the connector's all sources to its database.
# sources_to_database:
# connector_id: source_db

# Optional -- This mapping is optional and only required to configure platform-instance for source
# A mapping of Fivetran connector id to data platform instance
# sources_to_platform_instance:
# connector_id:
# platform_instance: cloud_instance
# env: DEV

# Optional -- This mapping is optional and only required to configure platform-instance for destination.
# A mapping of Fivetran destination id to data platform instance
# destination_to_platform_instance:
# destination_id:
# platform_instance: cloud_instance
# env: DEV

sink:
# sink configs
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"},
"unity-catalog": databricks | sqllineage_lib,
"fivetran": snowflake_common,
}

# This is mainly used to exclude plugins from the Docker image.
Expand Down Expand Up @@ -525,6 +526,7 @@
"nifi",
"vertica",
"mode",
"fivetran",
"kafka-connect",
]
if plugin
Expand Down Expand Up @@ -629,6 +631,7 @@
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
"gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource",
"sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource",
"fivetran = datahub.ingestion.source.fivetran.fivetran:FivetranSource",
],
"datahub.ingestion.transformer.plugins": [
"simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership",
Expand Down
25 changes: 15 additions & 10 deletions metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def generate_tags_aspect(self) -> Iterable[GlobalTagsClass]:
)
return [tags]

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_mcp(
self, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInfoClass(
Expand All @@ -113,7 +115,9 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
)
yield mcp

yield from self.generate_data_input_output_mcp()
yield from self.generate_data_input_output_mcp(
materialize_iolets=materialize_iolets
)

for owner in self.generate_ownership_aspect():
mcp = MetadataChangeProposalWrapper(
Expand Down Expand Up @@ -144,7 +148,9 @@ def emit(
for mcp in self.generate_mcp():
emitter.emit(mcp, callback)

def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_data_input_output_mcp(
self, materialize_iolets: bool
) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInputOutputClass(
Expand All @@ -157,10 +163,9 @@ def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapp
yield mcp

# Force entity materialization
for iolet in self.inlets + self.outlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)

yield mcp
if materialize_iolets:
for iolet in self.inlets + self.outlets:
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,10 @@ def emit_process_end(
self._emit_mcp(mcp, emitter, callback)

def generate_mcp(
self, created_ts_millis: Optional[int] = None
self, created_ts_millis: Optional[int] = None, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
"""
Generates mcps from the object
:rtype: Iterable[MetadataChangeProposalWrapper]
"""
"""Generates mcps from the object"""

mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataProcessInstanceProperties(
Expand Down Expand Up @@ -253,7 +251,7 @@ def generate_mcp(
)
yield mcp

yield from self.generate_inlet_outlet_mcp()
yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets)

@staticmethod
def _emit_mcp(
Expand Down Expand Up @@ -329,7 +327,9 @@ def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
dpi._template_object = dataflow
return dpi

def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_inlet_outlet_mcp(
self, materialize_iolets: bool
) -> Iterable[MetadataChangeProposalWrapper]:
if self.inlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
Expand All @@ -349,10 +349,9 @@ def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
yield mcp

# Force entity materialization
for iolet in self.inlets + self.outlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)

yield mcp
if materialize_iolets:
for iolet in self.inlets + self.outlets:
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def from_obj_require_wrapper(
return mcp

def as_workunit(
self, *, treat_errors_as_warnings: bool = False
self, *, treat_errors_as_warnings: bool = False, is_primary_source: bool = True
) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

Expand All @@ -254,10 +254,12 @@ def as_workunit(
id=f"{self.entityUrn}-{self.aspectName}-{ts}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)

return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)
13 changes: 11 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
Expand Down Expand Up @@ -64,9 +65,9 @@ def auto_status_aspect(
"""
For all entities that don't have a status aspect, add one with removed set to false.
"""

all_urns: Set[str] = set()
status_urns: Set[str] = set()
skip_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)
Expand All @@ -89,9 +90,17 @@ def auto_status_aspect(
else:
raise ValueError(f"Unexpected type {type(wu.metadata)}")

if not isinstance(
wu.metadata, MetadataChangeEventClass
) and not entity_supports_aspect(wu.metadata.entityType, StatusClass):
# If any entity does not support aspect 'status' then skip that entity from adding status aspect.
# Example like dataProcessInstance doesn't suppport status aspect.
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance
skip_urns.add(urn)

yield wu

for urn in sorted(all_urns - status_urns):
for urn in sorted(all_urns - status_urns - skip_urns):
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
Expand Down
Empty file.
Loading

0 comments on commit e73e926

Please sign in to comment.