Skip to content

Commit

Permalink
Merge pull request #168 from catalyst-cooperative/kaggle-github-etl
Browse files Browse the repository at this point in the history
ETL Kaggle and Github metrics
  • Loading branch information
e-belfer authored Oct 1, 2024
2 parents 7b18396 + 105e5ee commit d737698
Show file tree
Hide file tree
Showing 16 changed files with 1,094 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""create initial schema
"""Recreate first migration
Revision ID: 23e83521c288
Revision ID: e8435a653eb2
Revises:
Create Date: 2024-09-12 12:16:34.908298
Create Date: 2024-09-20 12:53:39.454455
"""
from typing import Sequence, Union
Expand All @@ -12,14 +12,124 @@


# revision identifiers, used by Alembic.
revision: str = '23e83521c288'
revision: str = 'e8435a653eb2'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('core_github_clones',
sa.Column('metrics_date', sa.Date(), nullable=False, comment='The date for each metrics snapshot.'),
sa.Column('total_clones', sa.Integer(), nullable=True),
sa.Column('unique_clones', sa.Integer(), nullable=True),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date')
)
op.create_table('core_github_forks',
sa.Column('id', sa.Integer(), nullable=False, comment='The unique identifier for each fork.'),
sa.Column('node_id', sa.String(), nullable=True),
sa.Column('name', sa.String(), nullable=True),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('private', sa.Boolean(), nullable=True),
sa.Column('owner', sa.String(), nullable=True),
sa.Column('description', sa.String(), nullable=True),
sa.Column('url', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('pushed_at', sa.DateTime(), nullable=True),
sa.Column('homepage', sa.String(), nullable=True),
sa.Column('size_kb', sa.Integer(), nullable=True),
sa.Column('stargazers_count', sa.Integer(), nullable=True),
sa.Column('watchers_count', sa.Integer(), nullable=True),
sa.Column('language', sa.String(), nullable=True),
sa.Column('has_issues', sa.Boolean(), nullable=True),
sa.Column('has_projects', sa.Boolean(), nullable=True),
sa.Column('has_downloads', sa.Boolean(), nullable=True),
sa.Column('has_wiki', sa.Boolean(), nullable=True),
sa.Column('has_pages', sa.Boolean(), nullable=True),
sa.Column('has_discussions', sa.Boolean(), nullable=True),
sa.Column('forks_count', sa.Integer(), nullable=True),
sa.Column('archived', sa.Boolean(), nullable=True),
sa.Column('disabled', sa.Boolean(), nullable=True),
sa.Column('license', sa.String(), nullable=True),
sa.Column('allow_forking', sa.Boolean(), nullable=True),
sa.Column('is_template', sa.Boolean(), nullable=True),
sa.Column('web_commit_signoff_required', sa.Boolean(), nullable=True),
sa.Column('topics', sa.String(), nullable=True),
sa.Column('visibility', sa.String(), nullable=True),
sa.Column('forks', sa.Integer(), nullable=True),
sa.Column('open_issues', sa.Integer(), nullable=True),
sa.Column('watchers', sa.Integer(), nullable=True),
sa.Column('default_branch', sa.String(), nullable=True),
sa.Column('permissions', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('core_github_popular_paths',
sa.Column('metrics_date', sa.Date(), nullable=False, comment='The date for each metrics snapshot.'),
sa.Column('path', sa.String(), nullable=False, comment='One of the ten most popular Github paths on a given date.'),
sa.Column('title', sa.String(), nullable=True),
sa.Column('total_views', sa.Integer(), nullable=True),
sa.Column('unique_views', sa.Integer(), nullable=True),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date', 'path')
)
op.create_table('core_github_popular_referrers',
sa.Column('metrics_date', sa.Date(), nullable=False, comment='The date for each metrics snapshot.'),
sa.Column('referrer', sa.String(), nullable=False, comment='The unique referrer.'),
sa.Column('total_referrals', sa.Integer(), nullable=True),
sa.Column('unique_referrals', sa.Integer(), nullable=True),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date', 'referrer')
)
op.create_table('core_github_stargazers',
sa.Column('id', sa.Integer(), nullable=False, comment='The unique identifier for each stargazer.'),
sa.Column('starred_at', sa.DateTime(), nullable=True),
sa.Column('login', sa.String(), nullable=True),
sa.Column('node_id', sa.String(), nullable=True),
sa.Column('url', sa.String(), nullable=True),
sa.Column('html_url', sa.String(), nullable=True),
sa.Column('followers_url', sa.String(), nullable=True),
sa.Column('following_url', sa.String(), nullable=True),
sa.Column('gists_url', sa.String(), nullable=True),
sa.Column('starred_url', sa.String(), nullable=True),
sa.Column('subscriptions_url', sa.String(), nullable=True),
sa.Column('organizations_url', sa.String(), nullable=True),
sa.Column('repos_url', sa.String(), nullable=True),
sa.Column('events_url', sa.String(), nullable=True),
sa.Column('received_events_url', sa.String(), nullable=True),
sa.Column('type', sa.String(), nullable=True),
sa.Column('site_admin', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('core_github_views',
sa.Column('metrics_date', sa.Date(), nullable=False, comment='The date for each metrics snapshot.'),
sa.Column('total_views', sa.Integer(), nullable=True),
sa.Column('unique_views', sa.Integer(), nullable=True),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date')
)
op.create_table('core_kaggle_logs',
sa.Column('metrics_date', sa.DateTime(), nullable=False, comment='The unique date for each metrics snapshot.'),
sa.Column('total_views', sa.Integer(), nullable=True),
sa.Column('total_downloads', sa.Integer(), nullable=True),
sa.Column('total_votes', sa.Integer(), nullable=True),
sa.Column('usability_rating', sa.Float(), nullable=True),
sa.Column('dataset_name', sa.String(), nullable=True),
sa.Column('owner', sa.String(), nullable=True),
sa.Column('title', sa.String(), nullable=True),
sa.Column('subtitle', sa.String(), nullable=True),
sa.Column('description', sa.String(), nullable=True),
sa.Column('keywords', sa.String(), nullable=True),
sa.Column('dataset_id', sa.String(), nullable=True),
sa.Column('is_private', sa.String(), nullable=True),
sa.Column('licenses', sa.String(), nullable=True),
sa.Column('collaborators', sa.String(), nullable=True),
sa.Column('data', sa.String(), nullable=True),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date')
)
op.create_table('core_s3_logs',
sa.Column('id', sa.String(), nullable=False, comment='A unique ID for each log.'),
sa.Column('time', sa.DateTime(), nullable=True),
Expand Down Expand Up @@ -202,4 +312,11 @@ def downgrade() -> None:
op.drop_table('intake_logs')
op.drop_table('datasette_request_logs')
op.drop_table('core_s3_logs')
op.drop_table('core_kaggle_logs')
op.drop_table('core_github_views')
op.drop_table('core_github_stargazers')
op.drop_table('core_github_popular_referrers')
op.drop_table('core_github_popular_paths')
op.drop_table('core_github_forks')
op.drop_table('core_github_clones')
# ### end Alembic commands ###
18 changes: 13 additions & 5 deletions run_data_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,29 @@ def main():
log_format = "%(asctime)s [%(levelname)8s] %(name)s:%(lineno)s %(message)s"
coloredlogs.install(fmt=log_format, level="INFO", logger=usage_metrics_logger)

job = defs.get_job_def(name="all_metrics_etl")
usage_metrics_logger.info(
f"""Saving to {os.getenv("METRICS_PROD_ENV", "local")} database."""
)
# Run the partitioned metrics
job = defs.get_job_def(name="all_partitioned_metrics_etl")

# Get last complete weekly partition
most_recent_partition = max(job.partitions_def.get_partition_keys())

# Run the jobs
usage_metrics_logger.info(
f"""Processing data from the week of {most_recent_partition} for {job.name}."""
)
usage_metrics_logger.info(
f"""Saving to {os.getenv("METRICS_PROD_ENV", "local")} database."""
f"""{job.name}: Processing partitioned data from the week of {most_recent_partition}."""
)

job.execute_in_process(partition_key=most_recent_partition)

# Run the non-partitioned metrics
usage_metrics_logger.info(
f"""{job.name}: Processing the most recent non-partitioned data."""
)
job = defs.get_job_def(name="all_nonpartitioned_metrics_etl")
job.execute_in_process()


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion src/usage_metrics/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Module contains assets that transform data into core assets."""

from . import s3
from . import github_nonpartitioned, github_partitioned, kaggle, s3
77 changes: 77 additions & 0 deletions src/usage_metrics/core/github_nonpartitioned.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Transform data from Github logs for stargazer and fork data."""

import os

import pandas as pd
from dagster import (
AssetExecutionContext,
asset,
)


@asset(
io_manager_key="database_manager",
tags={"source": "github_nonpartitioned"},
)
def core_github_forks(
context: AssetExecutionContext,
raw_github_forks: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the forks to the PUDL Github repository."""
df = raw_github_forks

# Drop 'fork' column and all URLs other than the main one for the repository
# Also drop "open_issues_count" which is identical to "open_issues"
df = df.rename(columns={"size": "size_kb"}).drop(
columns=[
col
for col in df.columns
if "_url" in col or col == "fork" or col == "open_issues_count"
]
)

# Convert string to datetime using Pandas
df[["created_at", "updated_at", "pushed_at"]] = df[
["created_at", "updated_at", "pushed_at"]
].apply(pd.to_datetime)

# Check validity of PK column
df = df.set_index("id")
assert df.index.is_unique

# For now, dump dictionaries and lists into a string
# We don't need these for metrics and expect them to stay essentially the same over time.
df[["owner", "permissions", "license", "topics"]] = df[
["owner", "permissions", "license", "topics"]
].astype(str)

context.log.info(f"Saving to {os.getenv("METRICS_PROD_ENV", "local")} environment.")

return df.reset_index()


@asset(
io_manager_key="database_manager",
tags={"source": "github_nonpartitioned"},
)
def core_github_stargazers(
context: AssetExecutionContext,
raw_github_stargazers: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the stargazers to the PUDL Github repository."""
df = raw_github_stargazers

# Drop 'fork' column and all URLs other than the main one for the repository
# Also drop "open_issues_count" which is identical to "open_issues"
df = df.rename({"size": "size_kb"}).drop(columns=["gravatar_id", "avatar_url"])

# Convert string to datetime using Pandas
df["starred_at"] = pd.to_datetime(df["starred_at"])

# Check validity of PK column
df = df.set_index("id")
assert df.index.is_unique

context.log.info(f"Saving to {os.getenv("METRICS_PROD_ENV", "local")} environment.")

return df.reset_index()
Loading

0 comments on commit d737698

Please sign in to comment.