-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathelt_pipeline.py
65 lines (54 loc) · 1.77 KB
/
elt_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import luigi
import sentry_sdk
import pandas as pd
import os
from pipeline.extract import Extract
from pipeline.load import Load
from pipeline.transform import Transform
from pipeline.utils.concat_dataframe import concat_dataframes
from pipeline.utils.copy_log import copy_log
from pipeline.utils.delete_temp_data import delete_temp
# Read env variables
DIR_ROOT_PROJECT = os.getenv("DIR_ROOT_PROJECT")
DIR_TEMP_LOG = os.getenv("DIR_TEMP_LOG")
DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
DIR_LOG = os.getenv("DIR_LOG")
SENTRY_DSN = os.getenv("SENTRY_DSN")
# Track the error using sentry
sentry_sdk.init(
dsn = f"{SENTRY_DSN}"
)
# Execute the functions when the script is run
if __name__ == "__main__":
# Build the task
luigi.build([Extract(),
Load(),
Transform()])
# Concat temp extract summary to final summary
concat_dataframes(
df1 = pd.read_csv(f'{DIR_ROOT_PROJECT}/pipeline_summary.csv'),
df2 = pd.read_csv(f'{DIR_TEMP_DATA}/extract-summary.csv')
)
# Concat temp load summary to final summary
concat_dataframes(
df1 = pd.read_csv(f'{DIR_ROOT_PROJECT}/pipeline_summary.csv'),
df2 = pd.read_csv(f'{DIR_TEMP_DATA}/load-summary.csv')
)
# Concat temp load summary to final summary
concat_dataframes(
df1 = pd.read_csv(f'{DIR_ROOT_PROJECT}/pipeline_summary.csv'),
df2 = pd.read_csv(f'{DIR_TEMP_DATA}/transform-summary.csv')
)
# Append log from temp to final log
copy_log(
source_file = f'{DIR_TEMP_LOG}/logs.log',
destination_file = f'{DIR_LOG}/logs.log'
)
# Delete temp data
delete_temp(
directory = f'{DIR_TEMP_DATA}'
)
# Delete temp log
delete_temp(
directory = f'{DIR_TEMP_LOG}'
)