diff --git a/f/examples/parquet_etl.flow/do_analytics_and_build_plot.inline_script.py b/f/examples/parquet_etl.flow/do_analytics_and_build_plot.inline_script.py deleted file mode 100644 index 8bd457c..0000000 --- a/f/examples/parquet_etl.flow/do_analytics_and_build_plot.inline_script.py +++ /dev/null @@ -1,70 +0,0 @@ -import matplotlib.pyplot as plt -import pandas as pd -import fastparquet -import base64 - -def main(path: str): - fig, ax = plt.subplots(figsize=(15, 6), dpi=200) - - # Colors - normal_color = '#cf1111' - anomaly_color = '#ffe4ad' - - # Subset our data for plotting, and separate into normal and anomalous - to_plot = pd.read_parquet( - path, - columns=['new_cases', 'new_cases_ma7', 'anomaly']).loc['2020-03':] - normal = to_plot.query('not anomaly') - anomalous = to_plot.query('anomaly') - - # Add new cases rolling average and annotation - to_plot['new_cases_ma7'].plot(color=normal_color, ax=ax) - - label_date = '2020-07-26' - ax.annotate('7-day\naverage', - xy=(label_date, to_plot['new_cases_ma7'].loc[label_date]), - xytext=(0, 16), textcoords='offset points', - horizontalalignment='center', - arrowprops=dict(arrowstyle="-"), - color=normal_color) - - # Add new cases bar plot and annotation - ax.bar(normal.index, normal['new_cases'], width=0.6, color=normal_color, - alpha=0.3) - ax.bar(anomalous.index, anomalous['new_cases'], width=0.6, color=anomaly_color) - - label_date = normal.index[normal.new_cases.argmax()] - ax.annotate('New\ncases', - xy=(label_date, to_plot['new_cases'].loc[label_date]), - xytext=(-20, 0), textcoords='offset points', - horizontalalignment='right', - verticalalignment='center', - arrowprops=dict(arrowstyle="-"), - color=normal_color) - - # Add and annotate horizontal grid lines - ax.hlines(100_000, to_plot.index[0], to_plot.index[-1], color='lightgray', - linestyle='--', linewidth=1) - ax.hlines(200_000, to_plot.index[0], to_plot.index[-1], color='lightgray', - linestyle='--', linewidth=1) - - ax.annotate('200,000 cases', - xy=('2020-03-01', 200_000), - xytext=(0, 5), textcoords='offset points', - fontsize='large', color='gray') - ax.annotate('100,000', - xy=('2020-03-01', 100_000), - xytext=(0, 5), textcoords='offset points', - fontsize='large', color='gray') - - # Styling - [ax.spines[spine].set_visible(False) - for spine in ['top', 'right', 'bottom', 'left']] - ax.yaxis.set_ticks([]) - ax.minorticks_off() - ax.set_xlabel(''); - plt.savefig('out.png') - with open('out.png', "rb") as image_file: - encoded_string = base64.b64encode(image_file.read()).decode('ascii') - - return {"png": encoded_string} diff --git a/f/examples/parquet_etl.flow/flow.yaml b/f/examples/parquet_etl.flow/flow.yaml deleted file mode 100644 index 250052c..0000000 --- a/f/examples/parquet_etl.flow/flow.yaml +++ /dev/null @@ -1,63 +0,0 @@ -summary: >- - Small ETL using same_worker to share parquet across steps and do data - analytics -description: >- - This flow uses the same_worker setting to share data in the '/shared' folder. - - The flow consists in getting covid-19 data from a csv, store it in parquet and - then build a plot out of it -value: - modules: - - id: a - summary: Get the covid-19 data and store it as parquet - value: - type: rawscript - content: '!inline get_the_covid-19_data_and_store_it_as_parquet.inline_script.py' - input_transforms: {} - lock: |- - cramjam==2.8.0 - fastparquet==2023.10.1 - fsspec==2023.12.2 - numpy==1.26.3 - packaging==23.2 - pandas==2.2.0 - python-dateutil==2.8.2 - pytz==2023.3.post1 - six==1.16.0 - tzdata==2023.4 - language: python3 - - id: b - summary: Do analytics and build plot - value: - type: rawscript - content: '!inline do_analytics_and_build_plot.inline_script.py' - input_transforms: - path: - type: javascript - expr: results.a - lock: |- - contourpy==1.2.0 - cramjam==2.8.0 - cycler==0.12.1 - fastparquet==2023.10.1 - fonttools==4.47.2 - fsspec==2023.12.2 - kiwisolver==1.4.5 - matplotlib==3.8.2 - numpy==1.26.3 - packaging==23.2 - pandas==2.2.0 - pillow==10.2.0 - pyparsing==3.1.1 - python-dateutil==2.8.2 - pytz==2023.3.post1 - six==1.16.0 - tzdata==2023.4 - language: python3 - same_worker: true -schema: - $schema: 'https://json-schema.org/draft/2020-12/schema' - type: object - properties: {} - required: [] -ws_error_handler_muted: false diff --git a/f/examples/parquet_etl.flow/get_the_covid-19_data_and_store_it_as_parquet.inline_script.py b/f/examples/parquet_etl.flow/get_the_covid-19_data_and_store_it_as_parquet.inline_script.py deleted file mode 100644 index 5ff572a..0000000 --- a/f/examples/parquet_etl.flow/get_the_covid-19_data_and_store_it_as_parquet.inline_script.py +++ /dev/null @@ -1,22 +0,0 @@ -import pandas as pd -import pathlib -import fastparquet - -def main(): - nytimes = pd.read_csv( - 'https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv', - index_col=['date'], parse_dates=['date']).asfreq(freq='D') - - # Transform: add new columns with useful transformations - nytimes['new_cases'] = nytimes['cases'].diff() - nytimes['new_cases_ma7'] = nytimes['new_cases'].rolling(7).mean() - - anomalous_dates = ['2020-09-21', '2020-11-26', '2020-12-11', '2020-12-25'] - nytimes['anomaly'] = False - nytimes.loc[anomalous_dates, 'anomaly'] = True - - nytimes_path = pathlib.Path('./shared/nytimes') - nytimes_path.mkdir(parents=True, exist_ok=True) - path = nytimes_path / 'us.parquet' - nytimes.to_parquet(path) - return path \ No newline at end of file