-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vector udf via regular rdd #1025
base: master
Are you sure you want to change the base?
Conversation
@soxofaan I plan on merging this next week, feel free to review. |
FYI: I just merged master in feature branch to trigger another jenkins build it's still failing though |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some quick comments from initial review
#pyspark.pandas.set_option('compute.max_rows', None) | ||
id_index = columns.index("feature_index") | ||
|
||
def mapTimeseriesRows(id_bands): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def mapTimeseriesRows(id_bands): | |
def map_timeseries_rows(id_bands): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it also possible to give id_bands
a type annotation? it's not very clear what this variable currently is
|
||
processed_df = csv_df.groupby("feature_index").apply(callback).reset_index() | ||
csv_as_list = df.rdd.map(list).map(lambda x: (x[id_index],x)).groupByKey().map(mapTimeseriesRows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
csv_as_list = df.rdd.map(list).map(lambda x: (x[id_index],x)).groupByKey().map(mapTimeseriesRows) | |
csv_as_list = df.rdd.map(list).map(lambda x: (x[id_index],x)).groupByKey().map(map_timeseries_rows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still figuring this out, but csv_as_list
seems bit confusing, this isn't a list of csv anymore, but an RDD of dicts, right?
# TODO: also pass feature_index to udf? | ||
processed = udf_function(feature_data) | ||
bands = id_bands[1] | ||
import pandas as pd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this local import shouldn't be necessary right?
feature_data.index = feature_data.index.strftime("%Y-%m-%dT%H:%M:%SZ") | ||
#if "date" in feature_data.columns: | ||
# feature_data = feature_data.set_index("date") | ||
#feature_data.index = feature_data.index.strftime("%Y-%m-%dT%H:%M:%SZ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we remove these lines of code instead of commenting out? What is the use of keeping them?
No description provided.