-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write data streaming to a parquet file #542
Comments
Right now we support streaming reads but not yet streaming writes. That's pending #305 |
Thanks! looking forward. Do you know how much work is left to do in that PR? |
I haven't looked at that PR in a while. It looks like it needs a little work to be updated with the latest main branch. But aside from that it might work with few changes. You can ask @H-Plus-Time if he's interested in working on that PR more. |
I don't have a full example myself; let's check with @H-Plus-Time in case he has a minimum example of using this. |
The crux of this is: you need something that generates Arrow IPC data first. The below is reasonable for a materialized table: import * as arrow from 'apache-arrow';
import * as parquet from 'parquet-wasm';
// construct your table (and therefore schema) via apache-arrow (or similar)
const arrowTableInstance = arrow.tableFromArrays({"sample": [1,2,3]});
const recordBatches = parquet.Table.fromIPCStream(arrow.tableToIPC(arrowTableInstance)).recordBatches();
// this is what you need (easy in this materialized case)
const streamOfRecordBatches = ReadableStream.from(recordBatches);
const byteStream = await parquet.transformParquetStream(streamOfRecordBatches); In NodeJS, the IO parts: import { open } from 'fs/promises';
import { Writable } from 'stream';
const handle = await open("file.parquet");
const destinationStream = Writable.toWeb(handle.createWriteStream());
await byteStream.pipeTo(destinationStream); Assuming you had something like an async generator that yielded batches of json objects, you'd do something like: const streamOfRecordBatches = new ReadableStream({
async start(controller) {
for await (const chunk of yourGeneratorFunction()) {
const arrowTable = arrow.tableFromJSON(chunk);
const recordBatches = parquet.Table.fromIPCStream(arrow.tableToIPC(arrowTable)).recordBatches();
for(const recordBatch of recordBatches) {
controller.enqueue(recordBatch);
}
}
controller.close();
},
}); |
The interface in the start of this thread (this: #542 (comment) ), would take a bit of adapting since it's (basically) callback-based. If it happens to be exactly your use-case, you'd want this: https://github.com/porsager/postgres?tab=readme-ov-file#await-sqlcursorrows--1-fn as your source async iterable. |
What?
Hi, we're using at the time @dsnp/parquetjs to write parquet files in node. But is a fork of an old package. And doesn't look super maintained.
So I came across this repo that looks super active but is not clear to me if we can do what we're doing now with parquet-wasm. So maybe you can help me understand.
What do we want to do?
We want to iterate a huge PostgreSQL table with a cursor so we have batches of rows that we want to iterate and store in a parquet file.
So I was wondering if that's possible with parquet-wasm. Handle streaming of data and at the end save the file in disk
This is how we do with
@dsnp/parquetjs
Thanks for the help!
The text was updated successfully, but these errors were encountered: