Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract and Transform to csv #72

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions common/birdxplorer_common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Base(DeclarativeBase):
NonNegativeInt: DECIMAL,
MediaDetails: JSON,
BinaryBool: CHAR,
String: String,
}


Expand Down Expand Up @@ -128,6 +129,46 @@ class RowNoteRecord(Base):
harmful: Mapped[NotesHarmful] = mapped_column(nullable=False)
validation_difficulty: Mapped[SummaryString] = mapped_column(nullable=False)
summary: Mapped[SummaryString] = mapped_column(nullable=False)
row_post_id: Mapped[TweetId] = mapped_column(ForeignKey("row_posts.post_id"), nullable=True)
row_post: Mapped["RowPostRecord"] = relationship("RowPostRecord", back_populates="row_notes")


class RowPostRecord(Base):
__tablename__ = "row_posts"

post_id: Mapped[TweetId] = mapped_column(primary_key=True)
author_id: Mapped[UserId] = mapped_column(ForeignKey("row_users.user_id"), nullable=False)
text: Mapped[SummaryString] = mapped_column(nullable=False)
media_type: Mapped[String] = mapped_column(nullable=True)
media_url: Mapped[String] = mapped_column(nullable=True)
created_at: Mapped[TwitterTimestamp] = mapped_column(nullable=False)
like_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
repost_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
bookmark_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
impression_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
quote_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
reply_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
lang: Mapped[String] = mapped_column()
row_notes: Mapped["RowNoteRecord"] = relationship("RowNoteRecord", back_populates="row_post")
user: Mapped["RowUserRecord"] = relationship("RowUserRecord", back_populates="row_post")


class RowUserRecord(Base):
__tablename__ = "row_users"

user_id: Mapped[UserId] = mapped_column(primary_key=True)
name: Mapped[UserName] = mapped_column(nullable=False)
user_name: Mapped[UserName] = mapped_column(nullable=False)
description: Mapped[SummaryString] = mapped_column(nullable=False)
profile_image_url: Mapped[String] = mapped_column(nullable=False)
followers_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
following_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
tweet_count: Mapped[NonNegativeInt] = mapped_column(nullable=False)
verified: Mapped[BinaryBool] = mapped_column(nullable=False)
verified_type: Mapped[String] = mapped_column(nullable=False)
location: Mapped[String] = mapped_column(nullable=False)
url: Mapped[String] = mapped_column(nullable=False)
row_post: Mapped["RowPostRecord"] = relationship("RowPostRecord", back_populates="user")


class Storage:
Expand Down
98 changes: 85 additions & 13 deletions etl/src/birdxplorer_etl/extract.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import csv
from datetime import datetime, timedelta

import requests
import stringcase
from prefect import get_run_logger
from sqlalchemy.orm import Session

from birdxplorer_common.storage import RowNoteRecord

from lib.x.postlookup import lookup
from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord
import settings
import time


def extract_data(db: Session):
logger = get_run_logger()
logger.info("Downloading community notes data")

# get columns of post table
columns = db.query(RowUserRecord).statement.columns.keys()
logger.info(columns)

# Noteデータを取得してSQLiteに保存
date = datetime.now()
latest_note = db.query(RowNoteRecord).order_by(RowNoteRecord.created_at_millis.desc()).first()
Expand All @@ -36,20 +39,89 @@ def extract_data(db: Session):
reader = csv.DictReader(tsv_data, delimiter="\t")
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

rows_to_add = []
for row in reader:
db.add(RowNoteRecord(**row))
if db.query(RowNoteRecord).filter(RowNoteRecord.note_id == row["note_id"]).first():
continue
rows_to_add.append(RowNoteRecord(**row))
db.bulk_save_objects(rows_to_add)

break
date = date - timedelta(days=1)

db.commit()

row1 = db.query(RowNoteRecord).first()
logger.info(row1)
# post = lookup()
# created_at = datetime.strptime(post["data"]["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
# created_at_millis = int(created_at.timestamp() * 1000)
# db_post = RowPostRecord(post_id=post["data"]["id"], author_id=post["data"]["author_id"], text=post["data"]["text"], created_at=created_at_millis,like_count=post["data"]["public_metrics"]["like_count"],repost_count=post["data"]["public_metrics"]["retweet_count"],bookmark_count=post["data"]["public_metrics"]["bookmark_count"],impression_count=post["data"]["public_metrics"]["impression_count"],quote_count=post["data"]["public_metrics"]["quote_count"],reply_count=post["data"]["public_metrics"]["reply_count"],lang=post["data"]["lang"])
# db.add(db_post)
# db.commit()

# Noteに紐づくtweetデータを取得
postExtract_targetNotes = (
db.query(RowNoteRecord)
.filter(RowNoteRecord.tweet_id != None)
.filter(RowNoteRecord.created_at_millis >= settings.TARGET_TWITTER_POST_START_UNIX_MILLISECOND)
.filter(RowNoteRecord.created_at_millis <= settings.TARGET_TWITTER_POST_END_UNIX_MILLISECOND)
.all()
)
logger.info(len(postExtract_targetNotes))
for note in postExtract_targetNotes:
tweet_id = note.tweet_id

is_tweetExist = db.query(RowPostRecord).filter(RowPostRecord.post_id == tweet_id).first()
if is_tweetExist is not None:
note.row_post_id = tweet_id
continue

logger.info(tweet_id)
post = lookup(tweet_id)
created_at = datetime.strptime(post["data"]["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
created_at_millis = int(created_at.timestamp() * 1000)

is_userExist = db.query(RowUserRecord).filter(RowUserRecord.user_id == post["data"]["author_id"]).first()
logger.info(is_userExist)
if is_userExist is None:
db_user = RowUserRecord(
user_id=post["data"]["author_id"],
name=post["includes"]["users"][0]["name"],
user_name=post["includes"]["users"][0]["username"],
description=post["includes"]["users"][0]["description"],
profile_image_url=post["includes"]["users"][0]["profile_image_url"],
followers_count=post["includes"]["users"][0]["public_metrics"]["followers_count"],
following_count=post["includes"]["users"][0]["public_metrics"]["following_count"],
tweet_count=post["includes"]["users"][0]["public_metrics"]["tweet_count"],
verified=post["includes"]["users"][0]["verified"],
verified_type=post["includes"]["users"][0]["verified_type"],
location=post["includes"]["users"][0]["location"],
url=post["includes"]["users"][0]["url"],
)
db.add(db_user)

media_url = post["includes"]["media"][0]["url"]
db_post = RowPostRecord(
post_id=post["data"]["id"],
author_id=post["data"]["author_id"],
text=post["data"]["text"],
media_type=post["includes"]["media"][0]["type"],
media_url=media_url,
created_at=created_at_millis,
like_count=post["data"]["public_metrics"]["like_count"],
repost_count=post["data"]["public_metrics"]["retweet_count"],
bookmark_count=post["data"]["public_metrics"]["bookmark_count"],
impression_count=post["data"]["public_metrics"]["impression_count"],
quote_count=post["data"]["public_metrics"]["quote_count"],
reply_count=post["data"]["public_metrics"]["reply_count"],
lang=post["data"]["lang"],
)
db.add(db_post)
note.row_post_id = tweet_id
time.sleep(1)
continue
db.commit()

# select note from db, get relation tweet and user data
note = db.query(RowNoteRecord).filter(RowNoteRecord.tweet_id == "1797617478950170784").first()

# # Noteに紐づくtweetデータを取得
# for note in notes_data:
# note_created_at = note.created_at_millis.serialize()
# if note_created_at >= settings.TARGET_TWITTER_POST_START_UNIX_MILLISECOND and note_created_at <= settings.TARGET_TWITTER_POST_END_UNIX_MILLISECOND: # noqa E501
# tweet_id = note.tweet_id.serialize()
# continue
return
10 changes: 8 additions & 2 deletions etl/src/birdxplorer_etl/lib/sqlite/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker

from birdxplorer_common.storage import RowNoteRecord
from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord


def init_db():
Expand All @@ -18,9 +18,15 @@ def init_db():

# 一時データベースのテーブル作成する
# ToDo: noteテーブル以外に必要なものを追加
if not inspect(engine).has_table("note"):
if not inspect(engine).has_table("row_notes"):
logger.info("Creating table note")
RowNoteRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_posts"):
logger.info("Creating table post")
RowPostRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_users"):
logger.info("Creating table user")
RowUserRecord.metadata.create_all(engine)

Session = sessionmaker(bind=engine)

Expand Down
47 changes: 47 additions & 0 deletions etl/src/birdxplorer_etl/lib/x/postlookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import requests
import json
import settings
from prefect import get_run_logger

def create_url(id):
logger = get_run_logger()
expansions = 'expansions=attachments.poll_ids,attachments.media_keys,author_id,edit_history_tweet_ids,entities.mentions.username,geo.place_id,in_reply_to_user_id,referenced_tweets.id,referenced_tweets.id.author_id'
tweet_fields = "tweet.fields=attachments,author_id,context_annotations,conversation_id,created_at,edit_controls,entities,geo,id,in_reply_to_user_id,lang,public_metrics,possibly_sensitive,referenced_tweets,reply_settings,source,text,withheld"
media_fields = 'media.fields=duration_ms,height,media_key,preview_image_url,type,url,width,public_metrics,alt_text,variants'
place_fields = 'place.fields=contained_within,country,country_code,full_name,geo,id,name,place_type'
user_fields = 'user.fields=created_at,description,entities,id,location,most_recent_tweet_id,name,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified,verified_type,withheld'

url = "https://api.twitter.com/2/tweets/{}?{}&{}&{}&{}&{}".format(id, tweet_fields, expansions,media_fields,place_fields,user_fields)
logger.info(url)
return url


def bearer_oauth(r):
"""
Method required by bearer token authentication.
"""

r.headers["Authorization"] = f"Bearer {settings.X_BEARER_TOKEN}"
r.headers["User-Agent"] = "v2TweetLookupPython"
return r


def connect_to_endpoint(url):
response = requests.request("GET", url, auth=bearer_oauth)
print(response.status_code)
if response.status_code != 200:
raise Exception(
"Request returned an error: {} {}".format(
response.status_code, response.text
)
)
return response.json()


def lookup(id):
url = create_url(id)
json_response = connect_to_endpoint(url)
print(json.dumps(json_response, indent=4, sort_keys=True))
return json_response

# https://oauth-playground.glitch.me/?id=findTweetsById&params=%28%27query%21%28%27C%27*B%29%7Ebody%21%28%29%7Epath%21%28%29*B%7EFAG%27%7EuserADfile_image_url%2CiNcreated_at%2CconnectK_statuHurlMublic_JtricHuserDtecteNentitieHdescriptK%27%7ECG%2Creferenced_Fs.id-keys-source_F%27%7EOAtype%2Curl%27%29*%7EidL146E37035677698IE43339741184I-%2CattachJnts.O_A.fieldLBE46120540165%27CexpansKLDnaJMroE03237FtweetGauthor_idHs%2CI%2C146JmeKionLs%21%27M%2CpNd%2COJdia%01ONMLKJIHGFEDCBA-*_
7 changes: 4 additions & 3 deletions etl/src/birdxplorer_etl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def extract(db: Session):


@task
def transform():
return transform_data()
def transform(db: Session):
transform_data(db)


@task
Expand All @@ -32,8 +32,9 @@ def load():
def run_etl():
i = initialize()
_ = extract(i["db"])
_ = transform()
_ = transform(i["db"])
_ = load()


if __name__ == "__main__":
run_etl()
11 changes: 9 additions & 2 deletions etl/src/birdxplorer_etl/settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
TARGET_TWITTER_POST_START_UNIX_MILLISECOND = 1577836800000
TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1577836799000
import os
from dotenv import load_dotenv

load_dotenv()

TARGET_TWITTER_POST_START_UNIX_MILLISECOND = 1717729500000
TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1717729610000

# Extractで何日前のデータを最新と定義するか。開発中は3日前が楽。
COMMUNITY_NOTE_DAYS_AGO = 3

X_BEARER_TOKEN = os.getenv("X_BEARER_TOKEN")
Loading
Loading