Skip to content

Commit

Permalink
Merge pull request #16 from delta-incubator/update-delta-rs
Browse files Browse the repository at this point in the history
updated nuget package reference
  • Loading branch information
mightyshazam authored Mar 10, 2024
2 parents 4e575c4 + 1e8de08 commit 16e6b5c
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,5 @@ jobs:
src/DeltaLake/bin/Release/*.snupkg
- name: Push nuget artifacts
run: |
dotnet nuget add source --username USERNAME --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github "https://nuget.pkg.github.com/mightyshazam/index.json"
dotnet nuget add source --username USERNAME --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github "https://nuget.pkg.github.com/delta-incubator/index.json"
dotnet nuget push .\src\DeltaLake\bin\Release\DeltaLake.*.nupkg --api-key ${{ secrets.GITHUB_TOKEN }} --source "github"
80 changes: 80 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,81 @@
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*

- [Quick Start](#quick-start)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

This package is a C# wrapper around [delta-rs](https://github.com/delta-io/delta-rs/tree/rust-v0.17.0).

It uses the [tokio-rs](https://tokio.rs/) runtime to provide asynchronous behavior. This allows the usage of .NET Tasks and async/await to take advantage of the same behavior provided by the underlying rust library.
This library also takes advantage of the [Apache Arrow](https://github.com/apache/arrow/blob/main/csharp/README.md) C# IPC formats to minimize the amount of copying required to move data between runtimes.

![alt text](architecture_simple.png "Using a Rust bridge library with .NET p/invoke")

The bridge library incorporates delta-rs and [tokio-rs](https://tokio.rs/) as shown in the image below.
![alt text](architecture_expanded.png "Rust bridge library with tokio")

## Quick Start

```csharp
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Memory;
using Apache.Arrow.Types;
using DeltaLake.Runtime;
using DeltaLake.Table;


public static Runtime CreateRuntime()
{
return new DeltaRuntime(RuntimeOptions.Default);
}

public static Task<DeltaTable> CreateDeltaTable(
Runtime runtime,
string path,
CancellationToken cancellationToken
)
{
var builder = new Apache.Arrow.Schema.Builder();
builder.Field(fb =>
{
fb.Name("test");
fb.DataType(Int32Type.Default);
fb.Nullable(false);
});
var schema = builder.Build();
return DeltaTable.CreateAsync(
runtime,
new TableCreateOptions(uri, schema)
{
Configuration = new Dictionary<string, string?>(),
},
cancellationToken);
}

public static Task<DeltaTable, Runtime> InsertIntoTable(
DeltaTable table,
CancellationToken cancellationToken)
{
var allocator = new NativeMemoryAllocator();
var recordBatchBuilder = new RecordBatch.Builder(allocator)
.Append(
"test",
false,
col => col.Int32(arr => arr.AppendRange(Enumerable.Range(0, length))));
var options = new InsertOptions
{
SaveMode = SaveMode.Append,
};
await table.InsertAsync(
[recordBatchBuilder.Build()],
schema,
options,
cancellationToken);
}
```
Binary file added architecture_expanded.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added architecture_simple.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
47 changes: 47 additions & 0 deletions examples/local/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Memory;
using Apache.Arrow.Types;
using DeltaLake.Runtime;
using DeltaLake.Table;

namespace local;

public class Program
{
public static async Task Main(string[] args)
{
var uri = args[0];
using var runtime = new DeltaRuntime(RuntimeOptions.Default);
var builder = new Apache.Arrow.Schema.Builder();
builder.Field(fb =>
{
fb.Name("test");
fb.DataType(Int32Type.Default);
fb.Nullable(false);
});
var schema = builder.Build();
var allocator = new NativeMemoryAllocator();
var recordBatchBuilder = new RecordBatch.Builder(allocator)
.Append("test", false, col => col.Int32(arr => arr.AppendRange(Enumerable.Range(0, length))));
using var table = await DeltaTable.CreateAsync(
runtime,
new TableCreateOptions(uri, schema)
{
Configuration = new Dictionary<string, string?>
{
["delta.dataSkippingNumIndexedCols"] = "32",
["delta.setTransactionRetentionDuration"] = null,
}
},
CancellationToken.None);
var options = new InsertOptions
{
SaveMode = SaveMode.Append,
};
await table.InsertAsync([recordBatchBuilder.Build()], schema, options, CancellationToken.None);
}
}
12 changes: 12 additions & 0 deletions examples/local/local.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DeltaLake\DeltaLake.csproj" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions src/DeltaLake/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 41 additions & 36 deletions src/DeltaLake/Bridge/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,54 +217,59 @@ public async Task<string> InsertAsync(
return string.Empty;
}

using var stream = new RecordBatchReader(records, schema);
return await InsertAsync(stream, options, cancellationToken).ConfigureAwait(false);
}

public async Task<string> InsertAsync(
IArrowArrayStream stream,
InsertOptions options,
ICancellationToken cancellationToken)
{
var tsc = new TaskCompletionSource<string>();
using (var scope = new Scope())
{
using (var stream = new RecordBatchReader(records, schema))
unsafe
{
unsafe
var ffiStream = CArrowArrayStream.Create();
CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream);
Interop.Methods.table_insert(
_runtime.Ptr,
_ptr,
ffiStream,
scope.Pointer(scope.ByteArray(options.Predicate)),
scope.Pointer(ConvertSaveMode(options.SaveMode).Ref),
new UIntPtr(options.MaxRowsPerGroup),
(byte)(options.OverwriteSchema ? 1 : 0),
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.GenericErrorCallback>((success, fail) =>
{
var ffiStream = CArrowArrayStream.Create();
CArrowArrayStreamExporter.ExportArrayStream(stream, ffiStream);
Interop.Methods.table_insert(
_runtime.Ptr,
_ptr,
ffiStream,
scope.Pointer(scope.ByteArray(options.Predicate)),
scope.Pointer(ConvertSaveMode(options.SaveMode).Ref),
new UIntPtr(options.MaxRowsPerGroup),
(byte)(options.OverwriteSchema ? 1 : 0),
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.GenericErrorCallback>((success, fail) =>
try
{
try
if (cancellationToken.IsCancellationRequested)
{
if (cancellationToken.IsCancellationRequested)
{
tsc.TrySetCanceled(cancellationToken);
return;
}
tsc.TrySetCanceled(cancellationToken);
return;
}
if (fail != null)
{
tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail));
}
else
{
tsc.TrySetResult("{}");
}
if (fail != null)
{
tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail));
}
finally
else
{
CArrowArrayStream.Free(ffiStream);
stream.Dispose();
tsc.TrySetResult("{}");
}
}));
}


return await tsc.Task.ConfigureAwait(false);
}
finally
{
CArrowArrayStream.Free(ffiStream);
stream.Dispose();
}
}));
}

return await tsc.Task.ConfigureAwait(false);
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/DeltaLake/Table/DeltaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Ipc;
using DeltaLake.Errors;
using DeltaLake.Runtime;

Expand Down Expand Up @@ -153,6 +154,29 @@ public async Task InsertAsync(
await _table.InsertAsync(records, schema, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Inserts a record batch into the table based upon the provided options
/// </summary>
/// <param name="records">A collection of <see cref="IArrowArrayStream"/> records to insert</param>
/// <param name="options"><see cref="InsertOptions"/> </param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> </param>
/// <returns><see cref="Task"/></returns>
public async Task InsertAsync(
IArrowArrayStream records,
InsertOptions options,
CancellationToken cancellationToken)
{

if (!options.IsValid)
{
throw new DeltaConfigurationException(
"Invalid InsertOptions",
new ArgumentException("configuration is invalid", nameof(options)));
}

await _table.InsertAsync(records, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Returns the table history
/// </summary>
Expand Down
38 changes: 38 additions & 0 deletions tests/DeltaLake.Tests/Table/InsertTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ public async Task Memory_Insert_Zero_Record_Count_Test()
Assert.Equal(version, table.Version());
}

[Fact]
public async Task Insert_Stream_Test()
{
var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0);
using var runtime = tableParts.runtime;
using var table = tableParts.table;
var version = table.Version();
var rb = new[] {
TableHelpers.BuildBasicRecordBatch(10),
};
var schema = table.Schema();
using var reader = new Bridge.RecordBatchReader(rb, schema);
await table.InsertAsync(reader, new InsertOptions(), CancellationToken.None);
Assert.Equal(version + 1, table.Version());
}

[Fact]
public async Task Memory_Insert_Will_Cancel_Test()
{
Expand Down Expand Up @@ -101,4 +117,26 @@ private async Task BaseInsertTest(string path, int length)
Assert.Empty(queryResult);
}
}

private async Task StreamInsertTest(string path, int length)
{
var data = await TableHelpers.SetupTable(path, length);
using var runtime = data.runtime;
using var table = data.table;
var queryResult = table.QueryAsync(new SelectQuery("SELECT test FROM test WHERE test > 1")
{
TableAlias = "test",
},
CancellationToken.None).ToBlockingEnumerable().ToList();

if (length > 2)
{
var totalRecords = queryResult.Select(s => s.Length).Sum();
Assert.Equal(length - 2, totalRecords);
}
else
{
Assert.Empty(queryResult);
}
}
}

0 comments on commit 16e6b5c

Please sign in to comment.