Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Make output-schema optional on tableWrite #12296

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions velox/py/plan_builder/PyPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,24 @@ std::optional<PyPlanNode> PyPlanBuilder::planNode() const {
}

PyPlanBuilder& PyPlanBuilder::tableWrite(
const PyType& outputSchema,
const PyFile& outputFile,
const std::string& connectorId) {
const std::string& connectorId,
const std::optional<PyType>& outputSchema) {
exec::test::PlanBuilder::TableWriterBuilder builder(planBuilder_);

// Try to convert the output type.
auto outputRowSchema = asRowType(outputSchema.type());
if (outputRowSchema == nullptr) {
throw std::runtime_error("Output schema must be a ROW().");
RowTypePtr outputRowSchema;

if (outputSchema != std::nullopt) {
outputRowSchema = asRowType(outputSchema->type());

if (outputRowSchema == nullptr) {
throw std::runtime_error("Output schema must be a ROW().");
}
builder.outputType(outputRowSchema);
}

builder.outputType(outputRowSchema)
.outputFileName(outputFile.filePath())
builder.outputFileName(outputFile.filePath())
.fileFormat(outputFile.fileFormat())
.connectorId(connectorId)
.endTableWriter();
Expand Down
9 changes: 5 additions & 4 deletions velox/py/plan_builder/PyPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,16 @@ class PyPlanBuilder {

/// Adds a table writer node to write to an output file(s).
///
/// @param outputSchema The schema to be used when writing the file (columns
/// and types).
/// @param outputFile The output file to be written.
/// @param connectorId The id of the connector to use during the write
/// process.
/// @param outputSchema An optional schema to be used when writing the file
/// (columns and types). By default use the schema produced by the upstream
/// operator.
PyPlanBuilder& tableWrite(
const PyType& outputSchema,
const PyFile& outputFile,
const std::string& connectorId);
const std::string& connectorId,
const std::optional<PyType>& outputSchema);

// Add the provided vectors straight into the operator tree.
PyPlanBuilder& values(const std::vector<PyVector>& values);
Expand Down
7 changes: 4 additions & 3 deletions velox/py/plan_builder/plan_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,18 @@ PYBIND11_MODULE(plan_builder, m) {
.def(
"table_write",
&velox::py::PyPlanBuilder::tableWrite,
py::arg("output_schema"),
py::arg("output_file"),
py::arg("connector_id") = "hive",
py::arg("output_schema") = std::nullopt,
py::doc(R"(
Adds a table write node to the plan.

Args:
output_schema: A RowType containing the schema to be written to
the file.
output_file: Name of the file to be written.
connector_id: ID of the connector to use for this scan.
output_schema: An optional RowType containing the schema to be
written to the file. By default write the schema
produced by the operator upstream.
)"))
.def(
"values",
Expand Down
1 change: 0 additions & 1 deletion velox/py/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def test_write_read_file(self):

plan_builder = PlanBuilder()
plan_builder.values([input_batch]).table_write(
output_schema=ROW(["c0"], [BIGINT()]),
output_file=DWRF(output_file),
connector_id="hive",
)
Expand Down
Loading