Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate py-polars API surface for streaming IPC formats #249

Merged
merged 3 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ features = [
"parquet",
"to_dummies",
"ipc",
"ipc_streaming",
"avro",
"list_eval",
"arg_where",
Expand Down
36 changes: 36 additions & 0 deletions __tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,42 @@ describe("ipc", () => {
expect(ipcDF).toFrameEqual(csvDF);
});
});
describe("ipc stream", () => {
beforeEach(() => {
pl.readCSV(csvpath).writeIPCStream(ipcpath);
});
afterEach(() => {
fs.rmSync(ipcpath);
});

test("read", () => {
const df = pl.readIPCStream(ipcpath);
expect(df.shape).toEqual({ height: 27, width: 4 });
});
test("read/write:buffer", () => {
const buff = pl.readCSV(csvpath).writeIPCStream();
const df = pl.readIPCStream(buff);
expect(df.shape).toEqual({ height: 27, width: 4 });
});
test("read:compressed", () => {
const csvDF = pl.readCSV(csvpath);
csvDF.writeIPCStream(ipcpath, { compression: "lz4" });
const ipcDF = pl.readIPCStream(ipcpath);
expect(ipcDF).toFrameEqual(csvDF);
});

test("read:options", () => {
const df = pl.readIPCStream(ipcpath, { nRows: 4 });
expect(df.shape).toEqual({ height: 4, width: 4 });
});

test("writeIPCStream", () => {
const csvDF = pl.readCSV(csvpath);
csvDF.writeIPCStream(ipcpath);
const ipcDF = pl.readIPCStream(ipcpath);
expect(ipcDF).toFrameEqual(csvDF);
});
});

describe("avro", () => {
beforeEach(() => {
Expand Down
40 changes: 36 additions & 4 deletions polars/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,29 @@ interface WriteMethods {
options?: { format: "lines" | "json" },
): void;
/**
* Write to Arrow IPC binary stream, or a feather file.
* @param file File path to which the file should be written.
* Write to Arrow IPC feather file, either to a file path or to a write stream.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
writeIPC(options?: WriteIPCOptions): Buffer;
writeIPC(destination: string | Writable, options?: WriteIPCOptions): void;

/**
* Write to Arrow IPC stream file, either to a file path or to a write stream.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
writeIPCStream(options?: WriteIPCOptions): Buffer;
writeIPCStream(
destination: string | Writable,
options?: WriteIPCOptions,
): void;

/**
* Write the DataFrame disk in parquet format.
* @param file File path to which the file should be written.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
Expand All @@ -163,7 +175,7 @@ interface WriteMethods {

/**
* Write the DataFrame disk in avro format.
* @param file File path to which the file should be written.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
Expand Down Expand Up @@ -2511,6 +2523,26 @@ export const _DataFrame = (_df: any): DataFrame => {

return Buffer.concat(buffers);
},
writeIPCStream(dest?, options = { compression: "uncompressed" }) {
if (dest instanceof Writable || typeof dest === "string") {
return _df.writeIpcStream(dest, options.compression) as any;
}
const buffers: Buffer[] = [];
const writeStream = new Stream.Writable({
write(chunk, _encoding, callback) {
buffers.push(chunk);
callback(null);
},
});

_df.writeIpcStream(
writeStream,
dest?.compression ?? options?.compression,
);
writeStream.end("");

return Buffer.concat(buffers);
},
toSeries: (index = 0) => _Series(_df.selectAtIdx(index) as any) as any,
toStruct(name) {
return _Series(_df.toStruct(name));
Expand Down
2 changes: 2 additions & 0 deletions polars/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export namespace pl {
export import readRecords = io.readRecords;
export import readCSV = io.readCSV;
export import readIPC = io.readIPC;
export import readIPCStream = io.readIPCStream;
export import readJSON = io.readJSON;
export import readParquet = io.readParquet;
export import readAvro = io.readAvro;
Expand Down Expand Up @@ -188,6 +189,7 @@ export import scanParquet = io.scanParquet;
export import readRecords = io.readRecords;
export import readCSV = io.readCSV;
export import readIPC = io.readIPC;
export import readIPCStream = io.readIPCStream;
export import readJSON = io.readJSON;
export import readParquet = io.readParquet;
export import readAvro = io.readAvro;
Expand Down
34 changes: 32 additions & 2 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ export interface ReadIPCOptions {
}

/**
* __Read into a DataFrame from Arrow IPC (Feather v2) file.__
* __Read into a DataFrame from Arrow IPC file (Feather v2).__
* ___
* @param pathOrBody - path or buffer or string
* - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`.
Expand All @@ -558,14 +558,44 @@ export function readIPC(pathOrBody, options = {}) {
throw new Error("must supply either a path or body");
}

/**
* __Read into a DataFrame from Arrow IPC stream.__
* ___
* @param pathOrBody - path or buffer or string
* - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`.
* - body: String or buffer to be read as Arrow IPC
* @param options.columns Columns to select. Accepts a list of column names.
* @param options.nRows Stop reading from parquet file after reading ``nRows``.
*/
export function readIPCStream(
pathOrBody: string | Buffer,
options?: Partial<ReadIPCOptions>,
): DataFrame;
export function readIPCStream(pathOrBody, options = {}) {
if (Buffer.isBuffer(pathOrBody)) {
return _DataFrame(pli.readIpcStream(pathOrBody, options));
}

if (typeof pathOrBody === "string") {
const inline = !isPath(pathOrBody, [".ipc"]);
if (inline) {
return _DataFrame(
pli.readIpcStream(Buffer.from(pathOrBody, "utf-8"), options),
);
}
return _DataFrame(pli.readIpcStream(pathOrBody, options));
}
throw new Error("must supply either a path or body");
}

export interface ScanIPCOptions {
nRows: number;
cache: boolean;
rechunk: boolean;
}

/**
* __Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns.__
* __Lazily read from an Arrow IPC file (Feather v2) or multiple files via glob patterns.__
* ___
* @param path Path to a IPC file.
* @param options.nRows Stop reading from IPC file after reading ``nRows``
Expand Down
69 changes: 69 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,43 @@ pub fn read_ipc(
Ok(JsDataFrame::new(df))
}

#[napi(catch_unwind)]
pub fn read_ipc_stream(
path_or_buffer: Either<String, Buffer>,
options: ReadIpcOptions,
) -> napi::Result<JsDataFrame> {
let columns = options.columns;
let projection = options
.projection
.map(|projection| projection.into_iter().map(|p| p as usize).collect());
let row_count = options.row_count.map(|rc| rc.into());
let n_rows = options.n_rows.map(|nr| nr as usize);

let result = match path_or_buffer {
Either::A(path) => {
let f = File::open(&path)?;
let reader = BufReader::new(f);
IpcStreamReader::new(reader)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_count)
.finish()
}
Either::B(buf) => {
let cursor = Cursor::new(buf.as_ref());
IpcStreamReader::new(cursor)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_count)
.finish()
}
};
let df = result.map_err(JsPolarsErr::from)?;
Ok(JsDataFrame::new(df))
}

#[napi(object)]
pub struct ReadAvroOptions {
pub columns: Option<Vec<String>>,
Expand Down Expand Up @@ -1426,6 +1463,38 @@ impl JsDataFrame {
Ok(())
}
#[napi(catch_unwind)]
pub fn write_ipc_stream(
&mut self,
path_or_buffer: JsUnknown,
compression: Wrap<Option<IpcCompression>>,
env: Env,
) -> napi::Result<()> {
let compression = compression.0;

match path_or_buffer.get_type()? {
ValueType::String => {
let path: napi::JsString = unsafe { path_or_buffer.cast() };
let path = path.into_utf8()?.into_owned()?;
let f = std::fs::File::create(path).unwrap();
let f = BufWriter::new(f);
IpcStreamWriter::new(f)
.with_compression(compression)
.finish(&mut self.df)
.map_err(JsPolarsErr::from)?;
}
ValueType::Object => {
let inner: napi::JsObject = unsafe { path_or_buffer.cast() };
let writeable = JsWriteStream { inner, env: &env };
IpcStreamWriter::new(writeable)
.with_compression(compression)
.finish(&mut self.df)
.map_err(JsPolarsErr::from)?;
}
_ => panic!(),
};
Ok(())
}
#[napi(catch_unwind)]
pub fn write_json(
&mut self,
path_or_buffer: JsUnknown,
Expand Down