Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactored adding support for RETL and Profiles #35

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ jobs:
- name: Install dependencies
run: |
pip3 install -r requirements.txt
- name: Test with unittest
- name: Test with pytest
run: |
pip3 install pytest-cov
make test
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
files: ./coverage.xml
token: ${{ secrets.CODECOV_TOKEN }}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*.egg-info
**/__pycache__
.vscode
.venv/


# Distribution / packaging
Expand Down Expand Up @@ -36,4 +37,4 @@ coverage.xml
*.py,cover
.hypothesis/
.pytest_cache/
cover/
cover/
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
.PHONY: test
test:
python3 -m unittest discover -s rudder_airflow_provider/test
pytest --cov=rudder_airflow_provider rudder_airflow_provider/test --cov-report=xml

lint:
ruff check rudder_airflow_provider/*.py
91 changes: 46 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
<b>
<a href="https://rudderstack.com">Website</a>
·
<a href="https://www.rudderstack.com/docs/reverse-etl/features/airflow-provider">Documentation</a>
·
<a href="https://rudderstack.com/join-rudderstack-slack-community">Slack Community</a>
</b>
</p>
Expand All @@ -20,10 +18,8 @@

# RudderStack Airflow Provider

The [RudderStack](https://rudderstack.com) Airflow Provider lets you programmatically schedule and trigger your [Reverse ETL](https://www.rudderstack.com/docs/reverse-etl) syncs from outside RudderStack and integrate them with your existing Airflow workflows.
The [RudderStack](https://rudderstack.com) Airflow Provider lets you programmatically schedule and trigger your [Reverse ETL](https://www.rudderstack.com/docs/reverse-etl) syncs and Profiles runs outside RudderStack and integrate them with your existing Airflow workflows.

| For more information on using the Airflow Provider utility, refer to the [documentation](https://www.rudderstack.com/docs/reverse-etl/features/airflow-provider/). |
| :---------|

## Installation

Expand All @@ -33,78 +29,83 @@ pip install rudderstack-airflow-provider

## Usage

### RudderstackOperator
### RudderstackRETLOperator

> [!NOTE]
> Use [RudderstackRETLOperator](#rudderstackretloperator) for reverse ETL connections

A simple DAG for triggering syncs for a RudderStack source:
A simple DAG for triggering syncs for a RudderStack Reverse ETL source:

```python
with DAG(
'rudderstack-sample',
"rudderstack-retl-sample",
default_args=default_args,
description='A simple tutorial DAG',
description="A simple tutorial DAG for reverse etl",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['rs']
tags=["rs-retl"],
) as dag:
rs_operator = RudderstackOperator(
source_id='<source-id>',
task_id='<any-task-id>',
connection_id='rudderstack_conn'
# retl_connection_id, sync_type are template fields
rs_operator = RudderstackRETLOperator(
retl_connection_id="connection_id",
task_id="<replace task id>",
connection_id="<rudderstack api airflow connection id>"
)
```

For the complete code, refer to this [example](https://github.com/rudderlabs/rudder-airflow-provider/blob/main/examples/sample_dag.py).
For the complete code, refer to this [example](https://github.com/rudderlabs/rudder-airflow-provider/tree/main/examples).

#### Operator Parameters
Mandatatory parameters for RudderstackRETLOperator:
* retl_connection_id: This is the [connection id](https://www.rudderstack.com/docs/sources/reverse-etl/airflow-provider/#where-can-i-find-the-connection-id-for-my-reverse-etl-connection) for the sync job.
* connection_id: The Airflow connection to use for connecting to the Rudderstack API. Default value is `rudderstack_default`.

| Parameter | Description | Type | Default |
| :--- |:--- | :--- | :---
| `source_id` | Valid RudderStack source ID | String | `None` |
| `task_id` | A unique task ID within a DAG | String | `None` |
| `wait_for_completion` | If `True`, the task will wait for sync to complete. | Boolean | `False` |
| `connection_id` | The Airflow connection to use for connecting to the Rudderstack API. | String | `rudderstack_default` |

The RudderStack operator also supports all the parameters supported by the [Airflow base operator](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html).
RudderstackRETLOperator exposes other configurable parameters as well. Mostly default values for them would be recommended.

For details on how to run the DAG in Airflow, refer to the [documentation](https://www.rudderstack.com/docs/reverse-etl/features/airflow-provider/#running-the-dag).
* request_max_retries: The maximum number of times requests to the RudderStack API should be retried before failng.
* request_retry_delay: Time (in seconds) to wait between each request retry.
* request_timeout: Time (in seconds) after which the requests to RudderStack are declared timed out.
* poll_interval: Time (in seconds) for polling status of triggered job.
* poll_timeout: Time (in seconds) after which the polling for a triggered job is declared timed out.
* wait_for_completion: Boolean if execution run should poll and wait till completion of sync. Default value is True.
* sync_type: Type of sync to trigger `incremental` or `full`. Default is None as RudderStack will be deteriming sync type.

### RudderstackRETLOperator

Trigger syncs for RETL connections
### RudderstackProfilesOperator

RudderstackProfilesOperator can be used to trigger profiles run. A simple DAG for triggering profile runs for a profiles project.

```python
with DAG('rudderstack-sample',
with DAG(
"rudderstack-profiles-sample",
default_args=default_args,
description='A simple tutorial DAG',
description="A simple tutorial DAG for profiles run.",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['rs']) as dag:
rs_operator = RudderstackRETLOperator(
retl_connection_id='2aiDQzMqP6LNuUokWstmaubcZOP',
task_id='retl-test-sync',
connection_id='rudder_yeshwanth_dev',
sync_type='full',
wait_for_completion=True
tags=["rs-profiles"],
) as dag:
# profile_id is template field
rs_operator = RudderstackProfilesOperator(
profile_id="<profile_id>",
task_id="<replace task id>",
connection_id="<rudderstack api connection id>",
)
```

#### Operator parameters

| Parameter | Description | Type | Default |
| :--- |:--- | :--- | :---
| `retl_connection_id` | Valid RudderStack RETL connection ID | String (templatable) | `None` |
| `task_id` | A unique task ID within a DAG | String | `None` |
| `wait_for_completion` | If `True`, the task will wait for sync to complete. | Boolean | `False` |
| `connection_id` | The Airflow connection to use for connecting to the Rudderstack API. | String | `rudderstack_default` |
|`sync_type`| Type of sync to trigger | `incremental` or `full` (templatable) | `incremental`|
Mandatatory parameters for RudderstackProfilesOperator:
* profile_id: This is the [profiles id](https://www.rudderstack.com/docs/api/profiles-api/#run-project) for the profiles project to run.
* connection_id: The Airflow connection to use for connecting to the Rudderstack API. Default value is `rudderstack_default`.

RudderstackRETLOperator exposes other configurable parameters as well. Mostly default values for them would be recommended.

For details on how to run the DAG in Airflow, refer to the [documentation](https://www.rudderstack.com/docs/reverse-etl/features/airflow-provider/#running-the-dag).
* request_max_retries: The maximum number of times requests to the RudderStack API should be retried before failng.
* request_retry_delay: Time (in seconds) to wait between each request retry.
* request_timeout: Time (in seconds) after which the requests to RudderStack are declared timed out.
* poll_interval: Time (in seconds) for polling status of triggered job.
* poll_timeout: Time (in seconds) after which the polling for a triggered job is declared timed out.
* wait_for_completion: Boolean if execution run should poll and wait till completion of sync. Default value is True.


## Contribute
Expand Down
34 changes: 34 additions & 0 deletions examples/profiles_sample_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime, timedelta

from airflow import DAG

from rudder_airflow_provider.operators.rudderstack import RudderstackProfilesOperator

default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"rudderstack-profiles-sample",
default_args=default_args,
description="A simple tutorial DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["rs-profiles"],
) as dag:
# profile_id is template field
rs_operator = RudderstackProfilesOperator(
profile_id="{{ var.value.profile_id }}",
task_id="<replace task id>",
connection_id="<rudderstack api connection id>",
)

if __name__ == "__main__":
dag.test()
29 changes: 15 additions & 14 deletions examples/retl_sample.dag.py → examples/retl_sample_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,31 @@
from rudder_airflow_provider.operators.rudderstack import RudderstackRETLOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG('rudderstack-sample',
with DAG(
"rudderstack-sample",
default_args=default_args,
description='A simple tutorial DAG',
description="A simple tutorial DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['rs']) as dag:
tags=["rs"],
) as dag:
# retl_connection_id, sync_type are template fields
rs_operator = RudderstackRETLOperator(
retl_connection_id="{{ var.value.retl_connection_id }}",
task_id='<replace task id>',
connection_id='<rudderstack api connection id>',
task_id="<replace task id>",
connection_id="<rudderstack api connection id>",
sync_type="{{ var.value.sync_type }}",
wait_for_completion=True
)

if __name__ == "__main__":
dag.test()
dag.test()
24 changes: 0 additions & 24 deletions examples/sample_dag.py

This file was deleted.

31 changes: 28 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
"setuptools >= 61.0",
"wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "rudderstack-airflow-provider"
version = "2.0.0"
readme = "README.md"
license = {file = "LICENSE"}
description = "Apache airflow provider for managing Reverse ETL syncs and Profiles runs in RudderStack."
keywords = [ "airflow", "orchestration", "rudderstack"]
classifiers = [
"Framework :: Apache Airflow",
"Framework :: Apache Airflow :: Provider",
]
dependencies = [
"apache-airflow",
"pytest",
"requests",
"responses",
"setuptools"
]
requires-python = ">= 3.6"

[tool.setuptools.packages.find]
exclude = ["*test*"]

[project.entry-points.apache_airflow_provider]
provider_info = "rudder_airflow_provider.__init__:get_provider_info"
6 changes: 4 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
apache-airflow == 2.8.0
requests == 2.28.2
apache-airflow == 2.10.0
requests == 2.32.3
setuptools == 65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability
pytest==7.3.1
ruff==0.6.8
6 changes: 3 additions & 3 deletions rudder_airflow_provider/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
def get_provider_info():
return {
'package-name': 'rudderstack-airflow-provider',
'name': 'rudderstack-airflow-provider',
'description': 'Apache airflow provider for Rudderstack'
"package-name": "rudderstack-airflow-provider",
"name": "rudderstack-airflow-provider",
"description": "Apache airflow provider for managing Reverse ETL syncs and Profiles runs in RudderStack.",
}
Loading