-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq4-github-web-to-gcs.py
54 lines (43 loc) · 1.59 KB
/
q4-github-web-to-gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from pathlib import Path
import pandas as pd
import os
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
from random import randint
@task(retries=3)
def fetch(dataset_url: str) -> pd.DataFrame:
"""Read taxi data from web into pandas Dataframe"""
df = pd.read_csv(dataset_url)
return df
@task(log_prints=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
"""Fix dtype issues"""
df['lpep_pickup_datetime']=pd.to_datetime(df['lpep_pickup_datetime'])
df['lpep_dropoff_datetime']=pd.to_datetime(df['lpep_dropoff_datetime'])
print(df.head(2))
print(f"columns: {df.dtypes}")
print(f"rows: {len(df)}")
return df
@task()
def write_local(df: pd.DataFrame, color: str, dataset_file: str) -> Path:
"""Write DataFrame out locally as a parquet file"""
folder = Path(f"data/{color}")
os.makedirs(folder, exist_ok=True)
path = Path(f"{folder}/{dataset_file}.parquet")
df.to_parquet(path, compression="gzip", )
return path
@task()
def write_gcs(path: Path) -> None:
gcs_block = GcsBucket.load("zoom-gcs")
gcs_block.upload_from_path(from_path=path, to_path=path)
return
@flow(log_prints=True)
def gh_etl_web_to_gcs(year: int = 2020, month : int = 11, color : str = "green") -> None:
dataset_file=f"{color}_tripdata_{year}-{month:02}"
dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"
df = fetch(dataset_url)
df_clean = clean(df)
path = write_local(df_clean, color, dataset_file)
write_gcs(path)
if __name__ == "__main__":
gh_etl_web_to_gcs()