-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closes #2880: Add parallel writing when writing pdarrays to Parquet #2881
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A whole lot of comments from me all just to say... LGTM lol
Nice work ben! This looks great!! 🚀
@@ -417,6 +419,7 @@ module ParquetMsg { | |||
dtype, compression, | |||
errMsg): int; | |||
var dtypeRep = toCDtype(dtype); | |||
var doParallel = if A.size > parallelWriteThreshold then true else false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay this is pedantic, but this could just be var doParallel = A.size > parallelWriteThreshold;
right?
var fileSizes: [0..#loc.maxTaskPar] int = locDom.size/loc.maxTaskPar; | ||
// First file has the extra elements if it isn't evenly divisible by maxTaskPar | ||
fileSizes[0] += locDom.size - ((locDom.size/loc.maxTaskPar)*loc.maxTaskPar); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i had to convince myself that the integer cast of the float divide would always round down. This seems to check out and the adjustment gave what i expected on a small example!
var fileSizes: [0..#6] int = 10/6; // 1.6666 verify this rounds down
writeln(fileSizes);
var leftOver = 10 - ((10/6)*6);
writeln(leftOver);
1 1 1 1 1 1
4
I will say in my small example this resulted in a pretty unbalanced distribution, but I think that in a real case that locDom.size
would be large enough relative to loc.maxTaskPar
that it would be pretty uniform... I'm just now realizing that's probably part of the motivation for having a parallelWriteThreshold
lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since leftOver
is the remainder of locDom.size/loc.maxTaskPar
, it should be guaranteed to be less than fileSizes.size
. So if we wanted to distribute the remainder more evenly we could do something like
var fileSizes: [0..#6] int = 10/6;
writeln(fileSizes);
var leftOver = 10 - ((10/6)*6);
writeln(leftOver);
fileSizes[0..#leftOver] += 1;
writeln(fileSizes);
1 1 1 1 1 1
4
2 2 2 2 1 1
But this is probably overengineering something that isn't actually a problem
var suffix = '%04i'.format(idx): string; | ||
var parSuffix = '%04i'.format(i): string; | ||
const parFilename = filename + "_LOCALE" + suffix + "_CORE" + parSuffix + ".parquet"; | ||
var oi = if i == 0 then i else offsets[i-1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing this back by one, couldn't you do
var offsets = (+ scan fileSizes) - fileSizes;
forall (i, off, len) in zip(fileSizes.domain, offsets, fileSizes) {
...
I don't think this would have any performance difference, but this is more similar to how we calculate offsets in other places. I normally prefer looping variables over indexing when possible because it makes easier for me to tell at a glance what's local, but that doesn't apply here. So there's def no need to change
Since @bmcdonald3 is out and none of my comments are blocking, I'll go ahead and merge this. Thanks again ben!!! EDIT: apparently ben is in and he wants to hold off for string support |
This PR adds support for a `parallelWriteThreshold` flag that allows a user to determine the size of files of the Parquet files to be written and then write those files in parallel, appending a `_CORE####` to the end of the file name. By running the Arkouda server with: `./arkouda_server --ParquetMsg.parallelWriteThreshold=<num>`, a user is able to control the size of the files that are going to be written. This is currently only supported on pdarrays of natively-supported datatypes (meaning not strings or dataframes), but follow work is on the way.
929f948
to
159db71
Compare
Reviewing PRs from our team today, I came across this one and wondered about its status. My understanding is:
Tagging @e-kayrakli for awareness and @jhh67 due to his recent Parquet work (albeit in the runtime rather than module-level code). |
This PR (closes #2880) adds support for a
parallelWriteThreshold
flag thatallows a user to determine the size of files of the Parquet files
to be written and then write those files in parallel, appending a
_CORE####
to the end of the file name.By running the Arkouda server with:
./arkouda_server --ParquetMsg.parallelWriteThreshold=<num>
, auser is able to control the size of the files that are going to be
written.
This is currently only supported on pdarrays of natively-supported
datatypes (meaning not strings or dataframes), but follow work is
on the way.