diff --git a/common/birdxplorer_common/storage.py b/common/birdxplorer_common/storage.py index d71a832..cd07b7a 100644 --- a/common/birdxplorer_common/storage.py +++ b/common/birdxplorer_common/storage.py @@ -49,6 +49,7 @@ class Base(DeclarativeBase): NonNegativeInt: DECIMAL, MediaDetails: JSON, BinaryBool: CHAR, + String: String, } @@ -128,6 +129,46 @@ class RowNoteRecord(Base): harmful: Mapped[NotesHarmful] = mapped_column(nullable=False) validation_difficulty: Mapped[SummaryString] = mapped_column(nullable=False) summary: Mapped[SummaryString] = mapped_column(nullable=False) + row_post_id: Mapped[TweetId] = mapped_column(ForeignKey("row_posts.post_id"), nullable=True) + row_post: Mapped["RowPostRecord"] = relationship("RowPostRecord", back_populates="row_notes") + + +class RowPostRecord(Base): + __tablename__ = "row_posts" + + post_id: Mapped[TweetId] = mapped_column(primary_key=True) + author_id: Mapped[UserId] = mapped_column(ForeignKey("row_users.user_id"), nullable=False) + text: Mapped[SummaryString] = mapped_column(nullable=False) + media_type: Mapped[String] = mapped_column(nullable=True) + media_url: Mapped[String] = mapped_column(nullable=True) + created_at: Mapped[TwitterTimestamp] = mapped_column(nullable=False) + like_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + repost_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + bookmark_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + impression_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + quote_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + reply_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + lang: Mapped[String] = mapped_column() + row_notes: Mapped["RowNoteRecord"] = relationship("RowNoteRecord", back_populates="row_post") + user: Mapped["RowUserRecord"] = relationship("RowUserRecord", back_populates="row_post") + + +class RowUserRecord(Base): + __tablename__ = "row_users" + + user_id: Mapped[UserId] = mapped_column(primary_key=True) + name: Mapped[UserName] = mapped_column(nullable=False) + user_name: Mapped[UserName] = mapped_column(nullable=False) + description: Mapped[SummaryString] = mapped_column(nullable=False) + profile_image_url: Mapped[String] = mapped_column(nullable=False) + followers_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + following_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + tweet_count: Mapped[NonNegativeInt] = mapped_column(nullable=False) + verified: Mapped[BinaryBool] = mapped_column(nullable=False) + verified_type: Mapped[String] = mapped_column(nullable=False) + location: Mapped[String] = mapped_column(nullable=False) + url: Mapped[String] = mapped_column(nullable=False) + row_post: Mapped["RowPostRecord"] = relationship("RowPostRecord", back_populates="user") class Storage: diff --git a/etl/src/birdxplorer_etl/extract.py b/etl/src/birdxplorer_etl/extract.py index a6f3590..3fa9f63 100644 --- a/etl/src/birdxplorer_etl/extract.py +++ b/etl/src/birdxplorer_etl/extract.py @@ -1,20 +1,23 @@ import csv from datetime import datetime, timedelta - import requests import stringcase from prefect import get_run_logger from sqlalchemy.orm import Session - -from birdxplorer_common.storage import RowNoteRecord - +from lib.x.postlookup import lookup +from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord import settings +import time def extract_data(db: Session): logger = get_run_logger() logger.info("Downloading community notes data") + # get columns of post table + columns = db.query(RowUserRecord).statement.columns.keys() + logger.info(columns) + # Noteデータを取得してSQLiteに保存 date = datetime.now() latest_note = db.query(RowNoteRecord).order_by(RowNoteRecord.created_at_millis.desc()).first() @@ -36,20 +39,89 @@ def extract_data(db: Session): reader = csv.DictReader(tsv_data, delimiter="\t") reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames] + rows_to_add = [] for row in reader: - db.add(RowNoteRecord(**row)) + if db.query(RowNoteRecord).filter(RowNoteRecord.note_id == row["note_id"]).first(): + continue + rows_to_add.append(RowNoteRecord(**row)) + db.bulk_save_objects(rows_to_add) + break date = date - timedelta(days=1) db.commit() - row1 = db.query(RowNoteRecord).first() - logger.info(row1) + # post = lookup() + # created_at = datetime.strptime(post["data"]["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + # created_at_millis = int(created_at.timestamp() * 1000) + # db_post = RowPostRecord(post_id=post["data"]["id"], author_id=post["data"]["author_id"], text=post["data"]["text"], created_at=created_at_millis,like_count=post["data"]["public_metrics"]["like_count"],repost_count=post["data"]["public_metrics"]["retweet_count"],bookmark_count=post["data"]["public_metrics"]["bookmark_count"],impression_count=post["data"]["public_metrics"]["impression_count"],quote_count=post["data"]["public_metrics"]["quote_count"],reply_count=post["data"]["public_metrics"]["reply_count"],lang=post["data"]["lang"]) + # db.add(db_post) + # db.commit() + + # Noteに紐づくtweetデータを取得 + postExtract_targetNotes = ( + db.query(RowNoteRecord) + .filter(RowNoteRecord.tweet_id != None) + .filter(RowNoteRecord.created_at_millis >= settings.TARGET_TWITTER_POST_START_UNIX_MILLISECOND) + .filter(RowNoteRecord.created_at_millis <= settings.TARGET_TWITTER_POST_END_UNIX_MILLISECOND) + .all() + ) + logger.info(len(postExtract_targetNotes)) + for note in postExtract_targetNotes: + tweet_id = note.tweet_id + + is_tweetExist = db.query(RowPostRecord).filter(RowPostRecord.post_id == tweet_id).first() + if is_tweetExist is not None: + note.row_post_id = tweet_id + continue + + logger.info(tweet_id) + post = lookup(tweet_id) + created_at = datetime.strptime(post["data"]["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + created_at_millis = int(created_at.timestamp() * 1000) + + is_userExist = db.query(RowUserRecord).filter(RowUserRecord.user_id == post["data"]["author_id"]).first() + logger.info(is_userExist) + if is_userExist is None: + db_user = RowUserRecord( + user_id=post["data"]["author_id"], + name=post["includes"]["users"][0]["name"], + user_name=post["includes"]["users"][0]["username"], + description=post["includes"]["users"][0]["description"], + profile_image_url=post["includes"]["users"][0]["profile_image_url"], + followers_count=post["includes"]["users"][0]["public_metrics"]["followers_count"], + following_count=post["includes"]["users"][0]["public_metrics"]["following_count"], + tweet_count=post["includes"]["users"][0]["public_metrics"]["tweet_count"], + verified=post["includes"]["users"][0]["verified"], + verified_type=post["includes"]["users"][0]["verified_type"], + location=post["includes"]["users"][0]["location"], + url=post["includes"]["users"][0]["url"], + ) + db.add(db_user) + + media_url = post["includes"]["media"][0]["url"] + db_post = RowPostRecord( + post_id=post["data"]["id"], + author_id=post["data"]["author_id"], + text=post["data"]["text"], + media_type=post["includes"]["media"][0]["type"], + media_url=media_url, + created_at=created_at_millis, + like_count=post["data"]["public_metrics"]["like_count"], + repost_count=post["data"]["public_metrics"]["retweet_count"], + bookmark_count=post["data"]["public_metrics"]["bookmark_count"], + impression_count=post["data"]["public_metrics"]["impression_count"], + quote_count=post["data"]["public_metrics"]["quote_count"], + reply_count=post["data"]["public_metrics"]["reply_count"], + lang=post["data"]["lang"], + ) + db.add(db_post) + note.row_post_id = tweet_id + time.sleep(1) + continue + db.commit() + + # select note from db, get relation tweet and user data + note = db.query(RowNoteRecord).filter(RowNoteRecord.tweet_id == "1797617478950170784").first() - # # Noteに紐づくtweetデータを取得 - # for note in notes_data: - # note_created_at = note.created_at_millis.serialize() - # if note_created_at >= settings.TARGET_TWITTER_POST_START_UNIX_MILLISECOND and note_created_at <= settings.TARGET_TWITTER_POST_END_UNIX_MILLISECOND: # noqa E501 - # tweet_id = note.tweet_id.serialize() - # continue return diff --git a/etl/src/birdxplorer_etl/lib/sqlite/init.py b/etl/src/birdxplorer_etl/lib/sqlite/init.py index 8dd8477..c167352 100644 --- a/etl/src/birdxplorer_etl/lib/sqlite/init.py +++ b/etl/src/birdxplorer_etl/lib/sqlite/init.py @@ -5,7 +5,7 @@ from sqlalchemy import create_engine, inspect from sqlalchemy.orm import sessionmaker -from birdxplorer_common.storage import RowNoteRecord +from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord def init_db(): @@ -18,9 +18,15 @@ def init_db(): # 一時データベースのテーブル作成する # ToDo: noteテーブル以外に必要なものを追加 - if not inspect(engine).has_table("note"): + if not inspect(engine).has_table("row_notes"): logger.info("Creating table note") RowNoteRecord.metadata.create_all(engine) + if not inspect(engine).has_table("row_posts"): + logger.info("Creating table post") + RowPostRecord.metadata.create_all(engine) + if not inspect(engine).has_table("row_users"): + logger.info("Creating table user") + RowUserRecord.metadata.create_all(engine) Session = sessionmaker(bind=engine) diff --git a/etl/src/birdxplorer_etl/lib/x/postlookup.py b/etl/src/birdxplorer_etl/lib/x/postlookup.py new file mode 100644 index 0000000..f949492 --- /dev/null +++ b/etl/src/birdxplorer_etl/lib/x/postlookup.py @@ -0,0 +1,47 @@ +import requests +import json +import settings +from prefect import get_run_logger + +def create_url(id): + logger = get_run_logger() + expansions = 'expansions=attachments.poll_ids,attachments.media_keys,author_id,edit_history_tweet_ids,entities.mentions.username,geo.place_id,in_reply_to_user_id,referenced_tweets.id,referenced_tweets.id.author_id' + tweet_fields = "tweet.fields=attachments,author_id,context_annotations,conversation_id,created_at,edit_controls,entities,geo,id,in_reply_to_user_id,lang,public_metrics,possibly_sensitive,referenced_tweets,reply_settings,source,text,withheld" + media_fields = 'media.fields=duration_ms,height,media_key,preview_image_url,type,url,width,public_metrics,alt_text,variants' + place_fields = 'place.fields=contained_within,country,country_code,full_name,geo,id,name,place_type' + user_fields = 'user.fields=created_at,description,entities,id,location,most_recent_tweet_id,name,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified,verified_type,withheld' + + url = "https://api.twitter.com/2/tweets/{}?{}&{}&{}&{}&{}".format(id, tweet_fields, expansions,media_fields,place_fields,user_fields) + logger.info(url) + return url + + +def bearer_oauth(r): + """ + Method required by bearer token authentication. + """ + + r.headers["Authorization"] = f"Bearer {settings.X_BEARER_TOKEN}" + r.headers["User-Agent"] = "v2TweetLookupPython" + return r + + +def connect_to_endpoint(url): + response = requests.request("GET", url, auth=bearer_oauth) + print(response.status_code) + if response.status_code != 200: + raise Exception( + "Request returned an error: {} {}".format( + response.status_code, response.text + ) + ) + return response.json() + + +def lookup(id): + url = create_url(id) + json_response = connect_to_endpoint(url) + print(json.dumps(json_response, indent=4, sort_keys=True)) + return json_response + +# https://oauth-playground.glitch.me/?id=findTweetsById¶ms=%28%27query%21%28%27C%27*B%29%7Ebody%21%28%29%7Epath%21%28%29*B%7EFAG%27%7EuserADfile_image_url%2CiNcreated_at%2CconnectK_statuHurlMublic_JtricHuserDtecteNentitieHdescriptK%27%7ECG%2Creferenced_Fs.id-keys-source_F%27%7EOAtype%2Curl%27%29*%7EidL146E37035677698IE43339741184I-%2CattachJnts.O_A.fieldLBE46120540165%27CexpansKLDnaJMroE03237FtweetGauthor_idHs%2CI%2C146JmeKionLs%21%27M%2CpNd%2COJdia%01ONMLKJIHGFEDCBA-*_ \ No newline at end of file diff --git a/etl/src/birdxplorer_etl/main.py b/etl/src/birdxplorer_etl/main.py index 4ec2db6..87d131d 100644 --- a/etl/src/birdxplorer_etl/main.py +++ b/etl/src/birdxplorer_etl/main.py @@ -19,8 +19,8 @@ def extract(db: Session): @task -def transform(): - return transform_data() +def transform(db: Session): + transform_data(db) @task @@ -32,8 +32,9 @@ def load(): def run_etl(): i = initialize() _ = extract(i["db"]) - _ = transform() + _ = transform(i["db"]) _ = load() + if __name__ == "__main__": run_etl() diff --git a/etl/src/birdxplorer_etl/settings.py b/etl/src/birdxplorer_etl/settings.py index 9541f55..30a4e6e 100644 --- a/etl/src/birdxplorer_etl/settings.py +++ b/etl/src/birdxplorer_etl/settings.py @@ -1,5 +1,12 @@ -TARGET_TWITTER_POST_START_UNIX_MILLISECOND = 1577836800000 -TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1577836799000 +import os +from dotenv import load_dotenv + +load_dotenv() + +TARGET_TWITTER_POST_START_UNIX_MILLISECOND = 1717729500000 +TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1717729610000 # Extractで何日前のデータを最新と定義するか。開発中は3日前が楽。 COMMUNITY_NOTE_DAYS_AGO = 3 + +X_BEARER_TOKEN = os.getenv("X_BEARER_TOKEN") diff --git a/etl/src/birdxplorer_etl/transform.py b/etl/src/birdxplorer_etl/transform.py index da2d7c8..40cd00b 100644 --- a/etl/src/birdxplorer_etl/transform.py +++ b/etl/src/birdxplorer_etl/transform.py @@ -1,6 +1,110 @@ import logging +from sqlalchemy import select, func +from sqlalchemy.orm import Session +from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord +import csv +import os -def transform_data(): +def transform_data(db: Session): logging.info("Transforming data") + + if not os.path.exists("./data/transformed"): + os.makedirs("./data/transformed") + + # Transform row note data and generate note.csv + if os.path.exists("./data/transformed/note.csv"): + os.remove("./data/transformed/note.csv") + + offset = 0 + limit = 1000 + + num_of_notes = db.query(func.count(RowNoteRecord.note_id)).scalar() + + while offset < num_of_notes: + notes = db.execute( + select( + RowNoteRecord.note_id, RowNoteRecord.row_post_id, RowNoteRecord.summary, RowNoteRecord.created_at_millis + ) + .limit(limit) + .offset(offset) + ) + + with open("./data/transformed/note.csv", "a") as file: + writer = csv.writer(file) + writer.writerow(["note_id", "post_id", "summary", "created_at"]) + for note in notes: + writer.writerow(note) + offset += limit + + # Transform row post data and generate post.csv + if os.path.exists("./data/transformed/post.csv"): + os.remove("./data/transformed/post.csv") + + offset = 0 + limit = 1000 + + num_of_posts = db.query(func.count(RowPostRecord.post_id)).scalar() + + while offset < num_of_posts: + posts = db.execute( + select( + RowPostRecord.post_id, + RowPostRecord.author_id.label("user_id"), + RowPostRecord.text, + RowPostRecord.created_at, + RowPostRecord.like_count, + RowPostRecord.repost_count, + RowPostRecord.impression_count, + ) + .limit(limit) + .offset(offset) + ) + + with open("./data/transformed/post.csv", "a") as file: + writer = csv.writer(file) + writer.writerow( + ["post_id", "user_id", "text", "created_at", "like_count", "repost_count", "impression_count"] + ) + for post in posts: + writer.writerow(post) + offset += limit + + # Transform row user data and generate user.csv + if os.path.exists("./data/transformed/user.csv"): + os.remove("./data/transformed/user.csv") + + offset = 0 + limit = 1000 + + num_of_users = db.query(func.count(RowUserRecord.user_id)).scalar() + + while offset < num_of_users: + users = db.execute( + select( + RowUserRecord.user_id, + RowUserRecord.user_name.label("name"), + RowUserRecord.profile_image_url.label("profile_image"), + RowUserRecord.followers_count, + RowUserRecord.following_count, + ) + .limit(limit) + .offset(offset) + ) + + with open("./data/transformed/user.csv", "a") as file: + writer = csv.writer(file) + writer.writerow( + [ + "user_id", + "name", + "profile_image", + "followers_count", + "following_count", + ] + ) + for user in users: + writer.writerow(user) + offset += limit + return