From 1f8cdc789abea6f8b53e6dd26c7e38ce991307d6 Mon Sep 17 00:00:00 2001 From: KyJah Keys Date: Fri, 8 Mar 2024 08:01:25 -0500 Subject: [PATCH] feat: add insert method that accepts IArrowArrayStream --- src/DeltaLake/Bridge/Cargo.lock | 4 +- src/DeltaLake/Bridge/Table.cs | 77 ++++++++++++---------- src/DeltaLake/Table/DeltaTable.cs | 24 +++++++ tests/DeltaLake.Tests/Table/InsertTests.cs | 38 +++++++++++ 4 files changed, 105 insertions(+), 38 deletions(-) diff --git a/src/DeltaLake/Bridge/Cargo.lock b/src/DeltaLake/Bridge/Cargo.lock index 6ba8a02..8140bed 100644 --- a/src/DeltaLake/Bridge/Cargo.lock +++ b/src/DeltaLake/Bridge/Cargo.lock @@ -1951,9 +1951,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "opaque-debug" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "openssl-probe" diff --git a/src/DeltaLake/Bridge/Table.cs b/src/DeltaLake/Bridge/Table.cs index 1c08d9b..94f92b1 100644 --- a/src/DeltaLake/Bridge/Table.cs +++ b/src/DeltaLake/Bridge/Table.cs @@ -217,54 +217,59 @@ public async Task InsertAsync( return string.Empty; } + using var stream = new RecordBatchReader(records, schema); + return await InsertAsync(stream, options, cancellationToken).ConfigureAwait(false); + } + + public async Task InsertAsync( + IArrowArrayStream stream, + InsertOptions options, + ICancellationToken cancellationToken) + { var tsc = new TaskCompletionSource(); using (var scope = new Scope()) { - using (var stream = new RecordBatchReader(records, schema)) + unsafe { - unsafe + var ffiStream = CArrowArrayStream.Create(); + CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream); + Interop.Methods.table_insert( + _runtime.Ptr, + _ptr, + ffiStream, + scope.Pointer(scope.ByteArray(options.Predicate)), + scope.Pointer(ConvertSaveMode(options.SaveMode).Ref), + new UIntPtr(options.MaxRowsPerGroup), + (byte)(options.OverwriteSchema ? 1 : 0), + scope.CancellationToken(cancellationToken), + scope.FunctionPointer((success, fail) => { - var ffiStream = CArrowArrayStream.Create(); - CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream); - Interop.Methods.table_insert( - _runtime.Ptr, - _ptr, - ffiStream, - scope.Pointer(scope.ByteArray(options.Predicate)), - scope.Pointer(ConvertSaveMode(options.SaveMode).Ref), - new UIntPtr(options.MaxRowsPerGroup), - (byte)(options.OverwriteSchema ? 1 : 0), - scope.CancellationToken(cancellationToken), - scope.FunctionPointer((success, fail) => + try { - try + if (cancellationToken.IsCancellationRequested) { - if (cancellationToken.IsCancellationRequested) - { - tsc.TrySetCanceled(cancellationToken); - return; - } + tsc.TrySetCanceled(cancellationToken); + return; + } - if (fail != null) - { - tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail)); - } - else - { - tsc.TrySetResult("{}"); - } + if (fail != null) + { + tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail)); } - finally + else { - CArrowArrayStream.Free(ffiStream); - stream.Dispose(); + tsc.TrySetResult("{}"); } - })); - } - - - return await tsc.Task.ConfigureAwait(false); + } + finally + { + CArrowArrayStream.Free(ffiStream); + stream.Dispose(); + } + })); } + + return await tsc.Task.ConfigureAwait(false); } } diff --git a/src/DeltaLake/Table/DeltaTable.cs b/src/DeltaLake/Table/DeltaTable.cs index edb20a4..f8d32d0 100644 --- a/src/DeltaLake/Table/DeltaTable.cs +++ b/src/DeltaLake/Table/DeltaTable.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using Apache.Arrow; +using Apache.Arrow.Ipc; using DeltaLake.Errors; using DeltaLake.Runtime; @@ -153,6 +154,29 @@ public async Task InsertAsync( await _table.InsertAsync(records, schema, options, cancellationToken).ConfigureAwait(false); } + /// + /// Inserts a record batch into the table based upon the provided options + /// + /// A collection of records to insert + /// + /// + /// + public async Task InsertAsync( + IArrowArrayStream records, + InsertOptions options, + CancellationToken cancellationToken) + { + + if (!options.IsValid) + { + throw new DeltaConfigurationException( + "Invalid InsertOptions", + new ArgumentException("configuration is invalid", nameof(options))); + } + + await _table.InsertAsync(records, options, cancellationToken).ConfigureAwait(false); + } + /// /// Returns the table history /// diff --git a/tests/DeltaLake.Tests/Table/InsertTests.cs b/tests/DeltaLake.Tests/Table/InsertTests.cs index dfb1783..f65a378 100644 --- a/tests/DeltaLake.Tests/Table/InsertTests.cs +++ b/tests/DeltaLake.Tests/Table/InsertTests.cs @@ -43,6 +43,22 @@ public async Task Memory_Insert_Zero_Record_Count_Test() Assert.Equal(version, table.Version()); } + [Fact] + public async Task Insert_Stream_Test() + { + var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0); + using var runtime = tableParts.runtime; + using var table = tableParts.table; + var version = table.Version(); + var rb = new[] { + TableHelpers.BuildBasicRecordBatch(10), + }; + var schema = table.Schema(); + using var reader = new Bridge.RecordBatchReader(rb, schema); + await table.InsertAsync(reader, new InsertOptions(), CancellationToken.None); + Assert.Equal(version + 1, table.Version()); + } + [Fact] public async Task Memory_Insert_Will_Cancel_Test() { @@ -101,4 +117,26 @@ private async Task BaseInsertTest(string path, int length) Assert.Empty(queryResult); } } + + private async Task StreamInsertTest(string path, int length) + { + var data = await TableHelpers.SetupTable(path, length); + using var runtime = data.runtime; + using var table = data.table; + var queryResult = table.QueryAsync(new SelectQuery("SELECT test FROM test WHERE test > 1") + { + TableAlias = "test", + }, + CancellationToken.None).ToBlockingEnumerable().ToList(); + + if (length > 2) + { + var totalRecords = queryResult.Select(s => s.Length).Sum(); + Assert.Equal(length - 2, totalRecords); + } + else + { + Assert.Empty(queryResult); + } + } } \ No newline at end of file