Skip to content

Commit

Permalink
Merge pull request #119 from codeforjapan/infra/etl-docker
Browse files Browse the repository at this point in the history
remove prefect
  • Loading branch information
yu23ki14 authored Oct 11, 2024
2 parents abd4336 + 1513c70 commit c1f6f8f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 58 deletions.
22 changes: 22 additions & 0 deletions etl/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
ARG ENVIRONMENT="prod"

# Use the official Python image from the Docker Hub
FROM python:3.10

# Set the working directory in the container
WORKDIR /app
COPY pyproject.toml ./
COPY src/birdxplorer_etl/__init__.py ./src/birdxplorer_etl/
COPY .env ./

RUN python -m pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -e .[prod]

RUN apt-get update && apt-get install -y --no-install-recommends libpq5 postgresql-client-15 sqlite3 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

COPY . ./

# Set the entry point to run the Prefect flow
ENTRYPOINT ["python", "src/birdxplorer_etl/main.py"]
1 change: 1 addition & 0 deletions etl/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dev=[
"isort",
]
prod=[
"psycopg2",
"birdxplorer_common @ git+https://github.com/codeforjapan/BirdXplorer.git@main#subdirectory=common",
]

Expand Down
19 changes: 9 additions & 10 deletions etl/src/birdxplorer_etl/extract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import csv
import logging
from datetime import datetime, timedelta
import requests
import stringcase
from prefect import get_run_logger
from sqlalchemy.orm import Session
from lib.x.postlookup import lookup
from birdxplorer_common.storage import (
Expand All @@ -17,12 +17,11 @@


def extract_data(db: Session):
logger = get_run_logger()
logger.info("Downloading community notes data")
logging.info("Downloading community notes data")

# get columns of post table
columns = db.query(RowUserRecord).statement.columns.keys()
logger.info(columns)
logging.info(columns)

# Noteデータを取得してSQLiteに保存
date = datetime.now()
Expand All @@ -43,7 +42,7 @@ def extract_data(db: Session):
"https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/notes_sample.tsv"
)

logger.info(note_url)
logging.info(note_url)
res = requests.get(note_url)

if res.status_code == 200:
Expand All @@ -66,7 +65,7 @@ def extract_data(db: Session):
if settings.USE_DUMMY_DATA:
status_url = "https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/noteStatus_sample.tsv"

logger.info(status_url)
logging.info(status_url)
res = requests.get(status_url)

if res.status_code == 200:
Expand Down Expand Up @@ -102,17 +101,17 @@ def extract_data(db: Session):
.filter(RowNoteRecord.created_at_millis <= settings.TARGET_TWITTER_POST_END_UNIX_MILLISECOND)
.all()
)
logger.info(len(postExtract_targetNotes))
logging.info(len(postExtract_targetNotes))
for note in postExtract_targetNotes:
tweet_id = note.tweet_id

is_tweetExist = db.query(RowPostRecord).filter(RowPostRecord.post_id == str(tweet_id)).first()
if is_tweetExist is not None:
logger.info(f"tweet_id {tweet_id} is already exist")
logging.info(f"tweet_id {tweet_id} is already exist")
note.row_post_id = tweet_id
continue

logger.info(tweet_id)
logging.info(tweet_id)
post = lookup(tweet_id)

if post == None or "data" not in post:
Expand All @@ -122,7 +121,7 @@ def extract_data(db: Session):
created_at_millis = int(created_at.timestamp() * 1000)

is_userExist = db.query(RowUserRecord).filter(RowUserRecord.user_id == post["data"]["author_id"]).first()
logger.info(is_userExist)
logging.info(is_userExist)
if is_userExist is None:
user_data = (
post["includes"]["users"][0]
Expand Down
16 changes: 7 additions & 9 deletions etl/src/birdxplorer_etl/lib/sqlite/init.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Create Note table for sqlite with columns: id, title, content, created_at, updated_at by sqlalchemy
import os
import logging

from prefect import get_run_logger
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker

Expand All @@ -15,29 +15,27 @@


def init_db():
logger = get_run_logger()

# ToDo: dbファイルをS3など外部に置く必要がある。
db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "data", "note.db"))
logger.info(f"Initializing database at {db_path}")
logging.info(f"Initializing database at {db_path}")
engine = create_engine("sqlite:///" + db_path)

# 一時データベースのテーブル作成する
# ToDo: noteテーブル以外に必要なものを追加
if not inspect(engine).has_table("row_notes"):
logger.info("Creating table note")
logging.info("Creating table note")
RowNoteRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_posts"):
logger.info("Creating table post")
logging.info("Creating table post")
RowPostRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_note_status"):
logger.info("Creating table note_status")
logging.info("Creating table note_status")
RowNoteStatusRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_users"):
logger.info("Creating table user")
logging.info("Creating table user")
RowUserRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_post_embed_urls"):
logger.info("Creating table post_embed_urls")
logging.info("Creating table post_embed_urls")
RowPostEmbedURLRecord.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
Expand Down
5 changes: 2 additions & 3 deletions etl/src/birdxplorer_etl/lib/x/postlookup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import requests
import settings
from prefect import get_run_logger
import time
import logging


def create_url(id):
Expand Down Expand Up @@ -30,11 +30,10 @@ def bearer_oauth(r):


def connect_to_endpoint(url):
logger = get_run_logger()
response = requests.request("GET", url, auth=bearer_oauth)
if response.status_code == 429:
limit = response.headers["x-rate-limit-reset"]
logger.info("Waiting for rate limit reset...")
logging.info("Waiting for rate limit reset...")
time.sleep(int(limit) - int(time.time()) + 1)
data = connect_to_endpoint(url)
return data
Expand Down
34 changes: 2 additions & 32 deletions etl/src/birdxplorer_etl/main.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,10 @@
from prefect import flow, task
from sqlalchemy.orm import Session

from lib.sqlite.init import init_db
from extract import extract_data
from load import load_data
from transform import transform_data


@task
def initialize():
if __name__ == "__main__":
db = init_db()
return {"db": db}


@task
def extract(db: Session):
extract_data(db)


@task
def transform(db: Session):
transform_data(db)


@task
def load():
return load_data()


@flow
def run_etl():
i = initialize()
_ = extract(i["db"])
_ = transform(i["db"])
_ = load()


if __name__ == "__main__":
run_etl()
load_data()
9 changes: 5 additions & 4 deletions etl/src/birdxplorer_etl/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import random
import uuid
import logging
from pathlib import Path
from typing import Generator

Expand All @@ -25,8 +26,8 @@


def transform_data(db: Session):
logger = get_run_logger()
logger.info("Transforming data")

logging.info("Transforming data")

if not os.path.exists("./data/transformed"):
os.makedirs("./data/transformed")
Expand Down Expand Up @@ -55,7 +56,7 @@ def transform_data(db: Session):

with open("./data/transformed/note.csv", "a") as file:

logger.info(f"Transforming note data: {num_of_notes}")
logging.info(f"Transforming note data: {num_of_notes}")
while offset < num_of_notes:
notes = db.execute(
select(
Expand All @@ -78,7 +79,7 @@ def transform_data(db: Session):
offset += limit

# Transform row post data and generate post.csv
logger.info("Transforming post data")
logging.info("Transforming post data")

if os.path.exists("./data/transformed/post.csv"):
os.remove("./data/transformed/post.csv")
Expand Down

0 comments on commit c1f6f8f

Please sign in to comment.