Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add insert method that accepts IArrowArrayStream #16

Merged
merged 2 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,5 @@ jobs:
src/DeltaLake/bin/Release/*.snupkg
- name: Push nuget artifacts
run: |
dotnet nuget add source --username USERNAME --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github "https://nuget.pkg.github.com/mightyshazam/index.json"
dotnet nuget add source --username USERNAME --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github "https://nuget.pkg.github.com/delta-incubator/index.json"
dotnet nuget push .\src\DeltaLake\bin\Release\DeltaLake.*.nupkg --api-key ${{ secrets.GITHUB_TOKEN }} --source "github"
80 changes: 80 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,81 @@
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*

- [Quick Start](#quick-start)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

This package is a C# wrapper around [delta-rs](https://github.com/delta-io/delta-rs/tree/rust-v0.17.0).

It uses the [tokio-rs](https://tokio.rs/) runtime to provide asynchronous behavior. This allows the usage of .NET Tasks and async/await to take advantage of the same behavior provided by the underlying rust library.
This library also takes advantage of the [Apache Arrow](https://github.com/apache/arrow/blob/main/csharp/README.md) C# IPC formats to minimize the amount of copying required to move data between runtimes.

![alt text](architecture_simple.png "Using a Rust bridge library with .NET p/invoke")

The bridge library incorporates delta-rs and [tokio-rs](https://tokio.rs/) as shown in the image below.
![alt text](architecture_expanded.png "Rust bridge library with tokio")

## Quick Start

```csharp
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Memory;
using Apache.Arrow.Types;
using DeltaLake.Runtime;
using DeltaLake.Table;


public static Runtime CreateRuntime()
{
return new DeltaRuntime(RuntimeOptions.Default);
}

public static Task<DeltaTable> CreateDeltaTable(
Runtime runtime,
string path,
CancellationToken cancellationToken
)
{
var builder = new Apache.Arrow.Schema.Builder();
builder.Field(fb =>
{
fb.Name("test");
fb.DataType(Int32Type.Default);
fb.Nullable(false);
});
var schema = builder.Build();
return DeltaTable.CreateAsync(
runtime,
new TableCreateOptions(uri, schema)
{
Configuration = new Dictionary<string, string?>(),
},
cancellationToken);
}

public static Task<DeltaTable, Runtime> InsertIntoTable(
DeltaTable table,
CancellationToken cancellationToken)
{
var allocator = new NativeMemoryAllocator();
var recordBatchBuilder = new RecordBatch.Builder(allocator)
.Append(
"test",
false,
col => col.Int32(arr => arr.AppendRange(Enumerable.Range(0, length))));
var options = new InsertOptions
{
SaveMode = SaveMode.Append,
};
await table.InsertAsync(
[recordBatchBuilder.Build()],
schema,
options,
cancellationToken);
}
```
Binary file added architecture_expanded.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added architecture_simple.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
47 changes: 47 additions & 0 deletions examples/local/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Memory;
using Apache.Arrow.Types;
using DeltaLake.Runtime;
using DeltaLake.Table;

namespace local;

public class Program
{
public static async Task Main(string[] args)
{
var uri = args[0];
using var runtime = new DeltaRuntime(RuntimeOptions.Default);
var builder = new Apache.Arrow.Schema.Builder();
builder.Field(fb =>
{
fb.Name("test");
fb.DataType(Int32Type.Default);
fb.Nullable(false);
});
var schema = builder.Build();
var allocator = new NativeMemoryAllocator();
var recordBatchBuilder = new RecordBatch.Builder(allocator)
.Append("test", false, col => col.Int32(arr => arr.AppendRange(Enumerable.Range(0, length))));
using var table = await DeltaTable.CreateAsync(
runtime,
new TableCreateOptions(uri, schema)
{
Configuration = new Dictionary<string, string?>
{
["delta.dataSkippingNumIndexedCols"] = "32",
["delta.setTransactionRetentionDuration"] = null,
}
},
CancellationToken.None);
var options = new InsertOptions
{
SaveMode = SaveMode.Append,
};
await table.InsertAsync([recordBatchBuilder.Build()], schema, options, CancellationToken.None);
}
}
12 changes: 12 additions & 0 deletions examples/local/local.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DeltaLake\DeltaLake.csproj" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions src/DeltaLake/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 41 additions & 36 deletions src/DeltaLake/Bridge/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,54 +217,59 @@ public async Task<string> InsertAsync(
return string.Empty;
}

using var stream = new RecordBatchReader(records, schema);
return await InsertAsync(stream, options, cancellationToken).ConfigureAwait(false);
}

public async Task<string> InsertAsync(
IArrowArrayStream stream,
InsertOptions options,
ICancellationToken cancellationToken)
{
var tsc = new TaskCompletionSource<string>();
using (var scope = new Scope())
{
using (var stream = new RecordBatchReader(records, schema))
unsafe
{
unsafe
var ffiStream = CArrowArrayStream.Create();
CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream);
Interop.Methods.table_insert(
_runtime.Ptr,
_ptr,
ffiStream,
scope.Pointer(scope.ByteArray(options.Predicate)),
scope.Pointer(ConvertSaveMode(options.SaveMode).Ref),
new UIntPtr(options.MaxRowsPerGroup),
(byte)(options.OverwriteSchema ? 1 : 0),
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.GenericErrorCallback>((success, fail) =>
{
var ffiStream = CArrowArrayStream.Create();
CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream);
Interop.Methods.table_insert(
_runtime.Ptr,
_ptr,
ffiStream,
scope.Pointer(scope.ByteArray(options.Predicate)),
scope.Pointer(ConvertSaveMode(options.SaveMode).Ref),
new UIntPtr(options.MaxRowsPerGroup),
(byte)(options.OverwriteSchema ? 1 : 0),
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.GenericErrorCallback>((success, fail) =>
try
{
try
if (cancellationToken.IsCancellationRequested)
{
if (cancellationToken.IsCancellationRequested)
{
tsc.TrySetCanceled(cancellationToken);
return;
}
tsc.TrySetCanceled(cancellationToken);
return;
}

if (fail != null)
{
tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail));
}
else
{
tsc.TrySetResult("{}");
}
if (fail != null)
{
tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail));
}
finally
else
{
CArrowArrayStream.Free(ffiStream);
stream.Dispose();
tsc.TrySetResult("{}");
}
}));
}


return await tsc.Task.ConfigureAwait(false);
}
finally
{
CArrowArrayStream.Free(ffiStream);
stream.Dispose();
}
}));
}

return await tsc.Task.ConfigureAwait(false);
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/DeltaLake/Table/DeltaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Ipc;
using DeltaLake.Errors;
using DeltaLake.Runtime;

Expand Down Expand Up @@ -153,6 +154,29 @@ public async Task InsertAsync(
await _table.InsertAsync(records, schema, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Inserts a record batch into the table based upon the provided options
/// </summary>
/// <param name="records">A collection of <see cref="IArrowArrayStream"/> records to insert</param>
/// <param name="options"><see cref="InsertOptions"/> </param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> </param>
/// <returns><see cref="Task"/></returns>
public async Task InsertAsync(
IArrowArrayStream records,
InsertOptions options,
CancellationToken cancellationToken)
{

if (!options.IsValid)
{
throw new DeltaConfigurationException(
"Invalid InsertOptions",
new ArgumentException("configuration is invalid", nameof(options)));
}

await _table.InsertAsync(records, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Returns the table history
/// </summary>
Expand Down
38 changes: 38 additions & 0 deletions tests/DeltaLake.Tests/Table/InsertTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ public async Task Memory_Insert_Zero_Record_Count_Test()
Assert.Equal(version, table.Version());
}

[Fact]
public async Task Insert_Stream_Test()
{
var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0);
using var runtime = tableParts.runtime;
using var table = tableParts.table;
var version = table.Version();
var rb = new[] {
TableHelpers.BuildBasicRecordBatch(10),
};
var schema = table.Schema();
using var reader = new Bridge.RecordBatchReader(rb, schema);
await table.InsertAsync(reader, new InsertOptions(), CancellationToken.None);
Assert.Equal(version + 1, table.Version());
}

[Fact]
public async Task Memory_Insert_Will_Cancel_Test()
{
Expand Down Expand Up @@ -101,4 +117,26 @@ private async Task BaseInsertTest(string path, int length)
Assert.Empty(queryResult);
}
}

private async Task StreamInsertTest(string path, int length)
{
var data = await TableHelpers.SetupTable(path, length);
using var runtime = data.runtime;
using var table = data.table;
var queryResult = table.QueryAsync(new SelectQuery("SELECT test FROM test WHERE test > 1")
{
TableAlias = "test",
},
CancellationToken.None).ToBlockingEnumerable().ToList();

if (length > 2)
{
var totalRecords = queryResult.Select(s => s.Length).Sum();
Assert.Equal(length - 2, totalRecords);
}
else
{
Assert.Empty(queryResult);
}
}
}
Loading