Skip to content

Commit

Permalink
split product data from csv
Browse files Browse the repository at this point in the history
  • Loading branch information
Apollosuny committed Dec 6, 2024
1 parent ab33dda commit bcaa252
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 4 deletions.
Binary file modified .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ RUN --mount=type=bind,target=./requirements.txt,src=./requirements.txt \
COPY --chown=airflow:airflow ../dags /app/dags
COPY --chown=airflow:airflow ../plugins /app/plugins
COPY --chown=airflow:airflow ../config /app/config
COPY --chown=airflow:airflow ../data /app/data

WORKDIR /app
8 changes: 7 additions & 1 deletion dags/init-db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow.operators.python import PythonOperator

from plugins.models.initialize import initialize_db
from plugins.transform.transform_product_data import transform_product_data


@dag(
Expand All @@ -25,7 +26,12 @@ def initialize():
python_callable=initialize_db,
)

initialize_task
transform_product_data_task = PythonOperator(
task_id="transform_product_data",
python_callable=transform_product_data,
)

initialize_task >> transform_product_data_task


initialize()
Binary file added data/.DS_Store
Binary file not shown.
3 changes: 3 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ services:
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: admin
networks:
- data-network

redis:
image: redis:latest
Expand Down Expand Up @@ -216,6 +218,7 @@ services:
- ./logs:/sources/logs
- ./plugins:/sources/plugins
- ./config:/sources/config
- ./data:/sources/data

volumes:
pgdata:
Expand Down
5 changes: 3 additions & 2 deletions plugins/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Order(base):

# relationships
customerId = Column(
Integer, ForeignKey("customer.customerId"), nullable=False
String, ForeignKey("customer.customerId"), nullable=False
)
customer = relationship("Customer", back_populates="order")

Expand Down Expand Up @@ -119,8 +119,9 @@ class ShippingCourierStatus(enum.Enum):

class Shipping(base):
__tablename__ = "shipping"
__table_args__ = (PrimaryKeyConstraint("orderId", name="orderId"),)
__table_args__ = (PrimaryKeyConstraint("shippingId", name="shippingId"),)

shippingId = Column(Integer, primary_key=True, autoincrement=True)
orderId = Column(String, ForeignKey("order.orderId"), nullable=False)
shipServiceLevel = Column(Enum(ShippingServiceLevel), nullable=False)
courierStatus = Column(Enum(ShippingCourierStatus), nullable=False)
Expand Down
33 changes: 33 additions & 0 deletions plugins/transform/transform_product_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pandas as pd
from sqlalchemy import create_engine

from config.settings import POSTGRES_CONN_STRING

engine = create_engine(POSTGRES_CONN_STRING)


def process_data(csv_file_path, table_name, selected_columns, chunk_size=1000):
try:
chunks = pd.read_csv(
csv_file_path, chunksize=chunk_size, usecols=selected_columns
)
for chunk in chunks:
chunk.to_sql(
name=table_name,
con=engine,
if_exists="append",
index=False,
)
print(
f"Data from {csv_file_path} successfully loaded into {table_name}."
)
except Exception as e:
print(f"Error occurred: {e}")


def transform_product_data():
process_data(
"data/amazon-sale-report.csv",
"products",
["SKU", "Style", "Category", "Size", "ASIN"],
)
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ methodtools==0.4.7
more-itertools==10.5.0
multidict==6.1.0
nodeenv==1.9.1
numpy==2.0.2
opentelemetry-api==1.27.0
opentelemetry-exporter-otlp==1.27.0
opentelemetry-exporter-otlp-proto-common==1.27.0
Expand All @@ -96,6 +97,7 @@ opentelemetry-sdk==1.27.0
opentelemetry-semantic-conventions==0.48b0
ordered-set==4.1.0
packaging==24.1
pandas==2.2.3
pathspec==0.12.1
pendulum==3.0.0
platformdirs==4.3.6
Expand All @@ -105,7 +107,7 @@ prison==0.2.1
propcache==0.2.0
protobuf==4.25.5
psutil==6.1.0
psycopg2==2.9.10
psycopg2-binary==2.9.10
pycparser==2.22
Pygments==2.18.0
PyJWT==2.9.0
Expand Down

0 comments on commit bcaa252

Please sign in to comment.