Skip to content

Commit

Permalink
feat: add insert method that accepts IArrowArrayStream
Browse files Browse the repository at this point in the history
  • Loading branch information
mightyshazam committed Mar 8, 2024
1 parent 3b4cef6 commit 1f8cdc7
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/DeltaLake/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 41 additions & 36 deletions src/DeltaLake/Bridge/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,54 +217,59 @@ public async Task<string> InsertAsync(
return string.Empty;
}

using var stream = new RecordBatchReader(records, schema);
return await InsertAsync(stream, options, cancellationToken).ConfigureAwait(false);
}

public async Task<string> InsertAsync(
IArrowArrayStream stream,
InsertOptions options,
ICancellationToken cancellationToken)
{
var tsc = new TaskCompletionSource<string>();
using (var scope = new Scope())
{
using (var stream = new RecordBatchReader(records, schema))
unsafe
{
unsafe
var ffiStream = CArrowArrayStream.Create();
CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream);
Interop.Methods.table_insert(
_runtime.Ptr,
_ptr,
ffiStream,
scope.Pointer(scope.ByteArray(options.Predicate)),
scope.Pointer(ConvertSaveMode(options.SaveMode).Ref),
new UIntPtr(options.MaxRowsPerGroup),
(byte)(options.OverwriteSchema ? 1 : 0),
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.GenericErrorCallback>((success, fail) =>
{
var ffiStream = CArrowArrayStream.Create();
CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream);
Interop.Methods.table_insert(
_runtime.Ptr,
_ptr,
ffiStream,
scope.Pointer(scope.ByteArray(options.Predicate)),
scope.Pointer(ConvertSaveMode(options.SaveMode).Ref),
new UIntPtr(options.MaxRowsPerGroup),
(byte)(options.OverwriteSchema ? 1 : 0),
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.GenericErrorCallback>((success, fail) =>
try
{
try
if (cancellationToken.IsCancellationRequested)
{
if (cancellationToken.IsCancellationRequested)
{
tsc.TrySetCanceled(cancellationToken);
return;
}
tsc.TrySetCanceled(cancellationToken);
return;
}
if (fail != null)
{
tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail));
}
else
{
tsc.TrySetResult("{}");
}
if (fail != null)
{
tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail));
}
finally
else
{
CArrowArrayStream.Free(ffiStream);
stream.Dispose();
tsc.TrySetResult("{}");
}
}));
}


return await tsc.Task.ConfigureAwait(false);
}
finally
{
CArrowArrayStream.Free(ffiStream);
stream.Dispose();
}
}));
}

return await tsc.Task.ConfigureAwait(false);
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/DeltaLake/Table/DeltaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Ipc;
using DeltaLake.Errors;
using DeltaLake.Runtime;

Expand Down Expand Up @@ -153,6 +154,29 @@ public async Task InsertAsync(
await _table.InsertAsync(records, schema, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Inserts a record batch into the table based upon the provided options
/// </summary>
/// <param name="records">A collection of <see cref="IArrowArrayStream"/> records to insert</param>
/// <param name="options"><see cref="InsertOptions"/> </param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> </param>
/// <returns><see cref="Task"/></returns>
public async Task InsertAsync(
IArrowArrayStream records,
InsertOptions options,
CancellationToken cancellationToken)
{

if (!options.IsValid)
{
throw new DeltaConfigurationException(
"Invalid InsertOptions",
new ArgumentException("configuration is invalid", nameof(options)));
}

await _table.InsertAsync(records, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Returns the table history
/// </summary>
Expand Down
38 changes: 38 additions & 0 deletions tests/DeltaLake.Tests/Table/InsertTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ public async Task Memory_Insert_Zero_Record_Count_Test()
Assert.Equal(version, table.Version());
}

[Fact]
public async Task Insert_Stream_Test()
{
var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0);
using var runtime = tableParts.runtime;
using var table = tableParts.table;
var version = table.Version();
var rb = new[] {
TableHelpers.BuildBasicRecordBatch(10),
};
var schema = table.Schema();
using var reader = new Bridge.RecordBatchReader(rb, schema);
await table.InsertAsync(reader, new InsertOptions(), CancellationToken.None);
Assert.Equal(version + 1, table.Version());
}

[Fact]
public async Task Memory_Insert_Will_Cancel_Test()
{
Expand Down Expand Up @@ -101,4 +117,26 @@ private async Task BaseInsertTest(string path, int length)
Assert.Empty(queryResult);
}
}

private async Task StreamInsertTest(string path, int length)
{
var data = await TableHelpers.SetupTable(path, length);
using var runtime = data.runtime;
using var table = data.table;
var queryResult = table.QueryAsync(new SelectQuery("SELECT test FROM test WHERE test > 1")
{
TableAlias = "test",
},
CancellationToken.None).ToBlockingEnumerable().ToList();

if (length > 2)
{
var totalRecords = queryResult.Select(s => s.Length).Sum();
Assert.Equal(length - 2, totalRecords);
}
else
{
Assert.Empty(queryResult);
}
}
}

0 comments on commit 1f8cdc7

Please sign in to comment.