Skip to content

Commit

Permalink
Merge pull request #3 from Apollosuny/huongdao
Browse files Browse the repository at this point in the history
Merge branch to main
  • Loading branch information
Apollosuny authored Jan 4, 2025
2 parents 394ac8a + 92eae96 commit 8ef96c8
Show file tree
Hide file tree
Showing 23 changed files with 1,338 additions and 80 deletions.
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ POSTGRESQL_USERNAME=xxxxxx
POSTGRESQL_PASSWORD=xxxxxx
POSTGRESQL_PORT=xxxxx
POSTGRESQL_HOST=xxxxx
POSTGRESQL_DB_NAME=xxxxxx
POSTGRESQL_DATABASE=your_database_name
POSTGRESQL_SCHEMA_NAME=public

DATABASE_URL=postgresql+psycopg2://${POSTGRESQL_USERNAME}:${POSTGRESQL_PASSWORD}@${POSTGRESQL_HOST}:${POSTGRESQL_PORT}/${POSTGRESQL_DB_NAME}
20 changes: 10 additions & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
repos:
- repo: https://github.com/psf/black
rev: 23.9.1
hooks:
- id: black
args: ['--line-length', '79']
# - repo: https://github.com/psf/black
# rev: 23.9.1
# hooks:
# - id: black
# args: ['--line-length', '79']

- repo: https://github.com/PyCQA/flake8
rev: 6.1.0
hooks:
- id: flake8
args: [--ignore=E501]
# - repo: https://github.com/PyCQA/flake8
# rev: 6.1.0
# hooks:
# - id: flake8
# args: [--ignore=E501]

- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.10.0
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ Access Database
# Access psql
psql -U <username> -d <database_name>
```

Data pipeline

```bash
initial_database >> insert_data_to_db >> create_dim >> create_fact
```
8 changes: 1 addition & 7 deletions dags/init-db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from airflow.operators.python import PythonOperator

from plugins.models.initialize import initialize_db
from plugins.transform.transform_product_data import transform_product_data


@dag(
Expand All @@ -26,12 +25,7 @@ def initialize():
python_callable=initialize_db,
)

transform_product_data_task = PythonOperator(
task_id="transform_product_data",
python_callable=transform_product_data,
)

initialize_task >> transform_product_data_task
initialize_task


initialize()
39 changes: 39 additions & 0 deletions dags/merge_erd_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

from plugins.transform_erd.run import MERGE_ERD

# Định nghĩa một số tham số chung cho DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
"catchup": False,
}


def run_merge_erd_transform(last_timestamp=None):
logger = logging.getLogger("airflow")
merge_erd = MERGE_ERD(logger)
merge_erd.transform(last_timestamp)


with DAG(
"merge_erd_dag", # Tên DAG
default_args=default_args,
description="DAG để chạy biến đổi dữ liệu cho ERD",
schedule_interval=timedelta(days=1),
catchup=False,
tags=["merge_erd"],
) as dag:
transform_task = PythonOperator(
task_id="run_merge_erd_transform",
python_callable=run_merge_erd_transform,
op_args=[None],
dag=dag,
)
55 changes: 55 additions & 0 deletions dags/merge_olap_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

from plugins.transfrom_olap.run import MERGE_OLAP

# Định nghĩa một số tham số chung cho DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
"catchup": False,
}


def run_merge_olap_transform_dim(last_timestamp=None):
logger = logging.getLogger("airflow")
merge_erd = MERGE_OLAP(logger)
merge_erd.transform_dim(last_timestamp)


def run_merge_olap_transform_fact(last_timestamp=None):
logger = logging.getLogger("airflow")
merge_erd = MERGE_OLAP(logger)
merge_erd.transform_fact(last_timestamp)


# Tạo DAG
with DAG(
"merge_olap_dag",
default_args=default_args,
description="DAG để chuyển đổi dữ liệu từ ERD sang OLAP",
schedule_interval=timedelta(days=1),
catchup=False,
tags=["merge_olap"],
) as dag:
transform_dim_task = PythonOperator(
task_id="run_merge_olap_transform_dim",
python_callable=run_merge_olap_transform_dim,
op_args=[None],
dag=dag,
)

transform_fact_task = PythonOperator(
task_id="run_merge_olap_transform_fact",
python_callable=run_merge_olap_transform_fact,
op_args=[None],
dag=dag,
)

transform_dim_task >> transform_fact_task
12 changes: 0 additions & 12 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,6 @@ services:
networks:
- data-network

pgadmin:
container_name: pgadmin_container
image: dpage/pgadmin4
restart: always
ports:
- '5050:80'
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: admin
networks:
- data-network

redis:
image: redis:latest
expose:
Expand Down
10 changes: 10 additions & 0 deletions plugins/check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dotenv import load_dotenv
import os

load_dotenv() # Tải các biến môi trường

# Kiểm tra: Kiểm tra xem biến có được tải đúng không
if "DATABASE_URL" in os.environ:
print("DATABASE_URL đã được tải thành công.")
else:
print("DATABASE_URL bị thiếu.")
Loading

0 comments on commit 8ef96c8

Please sign in to comment.