Skip to content

Commit

Permalink
ETLのメディア情報をcsvに書き出せるようにする
Browse files Browse the repository at this point in the history
  • Loading branch information
sushichan044 committed Oct 10, 2024
1 parent 223f57b commit 5229c67
Showing 1 changed file with 59 additions and 9 deletions.
68 changes: 59 additions & 9 deletions etl/src/birdxplorer_etl/transform.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
from sqlalchemy import select, func, and_, Integer
import csv
import os
import random
import uuid
from pathlib import Path
from typing import Generator

from prefect import get_run_logger
from sqlalchemy import Integer, and_, func, select
from sqlalchemy.orm import Session

from birdxplorer_common.storage import (
RowNoteRecord,
RowPostRecord,
RowUserRecord,
RowNoteStatusRecord,
RowPostEmbedURLRecord,
RowPostMediaRecord,
RowPostRecord,
RowUserRecord,
)
from birdxplorer_etl.lib.ai_model.ai_model_interface import get_ai_service
from birdxplorer_etl.settings import (
TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND,
TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND,
TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND,
)
import csv
import os
from prefect import get_run_logger
import uuid
import random


def transform_data(db: Session):
Expand Down Expand Up @@ -147,6 +152,7 @@ def transform_data(db: Session):
offset += limit

# Transform row post embed link
write_media_csv(db)
generate_post_link(db)

# Transform row post embed url data and generate post_embed_url.csv
Expand Down Expand Up @@ -178,6 +184,50 @@ def transform_data(db: Session):
return


def write_media_csv(db: Session) -> None:
media_csv_path = Path("./data/transformed/media.csv")
post_media_association_csv_path = Path("./data/transformed/post_media_association.csv")

if media_csv_path.exists():
media_csv_path.unlink(missing_ok=True)
if post_media_association_csv_path.exists():
post_media_association_csv_path.unlink(missing_ok=True)

with (
media_csv_path.open("a", newline="", encoding="utf-8") as media_csv,
post_media_association_csv_path.open("a", newline="", encoding="utf-8") as assoc_csv,
):
media_fields = ["media_key", "type", "url", "width", "height", "post_id"]
media_writer = csv.DictWriter(media_csv, fieldnames=media_fields)
media_writer.writeheader()
assoc_fields = ["post_id", "media_key"]
assoc_writer = csv.DictWriter(assoc_csv, fieldnames=assoc_fields)
assoc_writer.writeheader()

for m in _iterate_media(db):
media_writer.writerow(
{
"media_key": m.media_key,
"type": m.type,
"url": m.url,
"width": m.width,
"height": m.height,
"post_id": m.post_id,
}
)
assoc_writer.writerow({"post_id": m.post_id, "media_key": m.media_key})


def _iterate_media(db: Session, limit: int = 1000) -> Generator[RowPostMediaRecord, None, None]:
offset = 0
total_media: int = db.query(func.count(RowPostMediaRecord.media_key)).scalar() or 0

while offset < total_media:
yield from db.query(RowPostMediaRecord).limit(limit).offset(offset)

offset += limit


def generate_post_link(db: Session):
link_csv_file_path = "./data/transformed/post_link.csv"
association_csv_file_path = "./data/transformed/post_link_association.csv"
Expand Down

0 comments on commit 5229c67

Please sign in to comment.