Skip to content

Commit

Permalink
added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mightyshazam committed Feb 16, 2024
1 parent 69f8269 commit 1a52a95
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/DeltaLake/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ internal static unsafe partial class Methods
public static extern void history([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("uintptr_t")] UIntPtr limit, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("GenericErrorCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_update_incremental([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback);
public static extern void table_update_incremental([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("int64_t")] long max_version, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_load_version([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("int64_t")] long version, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback);
Expand All @@ -396,7 +396,7 @@ internal static unsafe partial class Methods
public static extern byte table_load_with_datetime([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("int64_t")] long ts_milliseconds, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_merge([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* delta_table, [NativeTypeName("struct ByteArrayRef * _Nonnull")] ByteArrayRef* query, void* stream, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("GenericErrorCallback")] IntPtr callback);
public static extern void table_merge([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* delta_table, [NativeTypeName("struct ByteArrayRef * _Nonnull")] ByteArrayRef* query, [NativeTypeName("void * _Nonnull")] void* stream, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("GenericErrorCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct ProtocolResponse")]
Expand Down
8 changes: 7 additions & 1 deletion src/DeltaLake/Bridge/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ public async Task<byte[]> HistoryAsync(ulong limit, ICancellationToken cancellat

public async Task AddConstraintAsync(IReadOnlyDictionary<string, string> constraints, IReadOnlyDictionary<string, string>? customMetadata, ICancellationToken cancellationToken)
{
if (constraints.Count == 0)
{
return;
}

var tsc = new TaskCompletionSource<bool>();
using (var scope = new Scope())
{
Expand Down Expand Up @@ -506,7 +511,7 @@ public async Task AddConstraintAsync(IReadOnlyDictionary<string, string> constra
}
}

public async Task UpdateIncrementalAsync(ICancellationToken cancellationToken)
public async Task UpdateIncrementalAsync(long? maxVersion, ICancellationToken cancellationToken)
{
var tsc = new TaskCompletionSource<bool>();
using (var scope = new Scope())
Expand All @@ -516,6 +521,7 @@ public async Task UpdateIncrementalAsync(ICancellationToken cancellationToken)
Methods.table_update_incremental(
_runtime.Ptr,
_ptr,
maxVersion ?? -1L,
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.TableEmptyCallback>((fail) =>
{
Expand Down
1 change: 1 addition & 0 deletions src/DeltaLake/Bridge/include/delta-lake-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ void history(struct Runtime *_Nonnull runtime,

void table_update_incremental(struct Runtime *_Nonnull runtime,
struct RawDeltaTable *_Nonnull table,
int64_t max_version,
const struct CancellationToken *cancellation_token,
TableEmptyCallback callback);

Expand Down
9 changes: 8 additions & 1 deletion src/DeltaLake/Bridge/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use deltalake::{
dataframe::DataFrame,
datasource::{MemTable, TableProvider},
execution::context::{SQLOptions, SessionContext},
logical_expr::max,
},
kernel::StructType,
operations::{
Expand Down Expand Up @@ -546,17 +547,23 @@ pub extern "C" fn history(
pub extern "C" fn table_update_incremental(
mut runtime: NonNull<Runtime>,
mut table: NonNull<RawDeltaTable>,
max_version: i64,
cancellation_token: Option<&CancellationToken>,
callback: TableEmptyCallback,
) {
let max_version = if max_version > 0 {
Some(max_version)
} else {
None
};
run_async_with_cancellation!(
runtime,
table,
cancellation_token,
rt,
tbl,
{
match tbl.table.update_incremental(None).await {
match tbl.table.update_incremental(max_version).await {
Ok(_) => unsafe {
callback(std::ptr::null());
},
Expand Down
7 changes: 4 additions & 3 deletions src/DeltaLake/Table/DeltaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,14 @@ public Task AddConstraintsAsync(
}

/// <summary>
/// Updates table to latest version
/// Updates table to specific or latest version
/// </summary>
/// <param name="maxVersion">Optional maximum version</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/></param>
/// <returns></returns>
public Task UpdateIncrementalAsync(CancellationToken cancellationToken)
public Task UpdateIncrementalAsync(long? maxVersion, CancellationToken cancellationToken)
{
return _table.UpdateIncrementalAsync(cancellationToken);
return _table.UpdateIncrementalAsync(maxVersion, cancellationToken);
}

/// <summary>
Expand Down
29 changes: 29 additions & 0 deletions tests/DeltaLake.Tests/Table/ConstraintTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,35 @@ await table.AddConstraintsAsync(
}
}

[Fact]
public async Task Invalid_Constraint_Test()
{
var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0);
using var runtime = tableParts.runtime;
using var table = tableParts.table;
await Assert.ThrowsAsync<DeltaLakeException>(() => table.AddConstraintsAsync(
new Dictionary<string, string>
{
["something isn't right"] = "invalid constraint",
},
new Dictionary<string, string>(),
CancellationToken.None));
}

[Fact]
public async Task Empty_Constraint_Test()
{
var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0);
using var runtime = tableParts.runtime;
using var table = tableParts.table;
var version = table.Version();
await table.AddConstraintsAsync(
new Dictionary<string, string>(),
new Dictionary<string, string>(),
CancellationToken.None);
Assert.Equal(version, table.Version());
}

public static IEnumerable<object[]> TestCases()
{
yield return [1, "hello", 0, new Dictionary<string, string> { ["first"] = "first > 0" }, false];
Expand Down
11 changes: 11 additions & 0 deletions tests/DeltaLake.Tests/Table/InsertTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ public async Task File_System_Insert_Variable_Record_Count_Test(int length)
}
}

[Fact]
public async Task Memory_Insert_Zero_Record_Count_Test()
{
var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0);
using var runtime = tableParts.runtime;
using var table = tableParts.table;
var version = table.Version();
await table.InsertAsync([], table.Schema(), new InsertOptions(), CancellationToken.None);
Assert.Equal(version, table.Version());
}

private async Task BaseInsertTest(string path, int length)
{
var data = await TableHelpers.SetupTable(path, length);
Expand Down
39 changes: 39 additions & 0 deletions tests/DeltaLake.Tests/Table/LoadTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,45 @@ public async Task Table_Load_Latest(TableIdentifier identifier, int minVersion,
Assert.Equal(expectedVersion, table.Version());
}

[Theory]
[InlineData(TableIdentifier.Checkpoints, 1, 12)]
[InlineData(TableIdentifier.CheckpointsVacuumed, 5, 12)]
[InlineData(TableIdentifier.Delta020, 1, 3)]
public async Task Table_Load_Update_Incremental(TableIdentifier identifier, int minVersion, int expectedVersion)
{
var location = identifier.TablePath();
using var runtime = new DeltaRuntime(RuntimeOptions.Default);
using var table = await DeltaTable.LoadAsync(runtime, location, new TableOptions
{
Version = minVersion,
},
CancellationToken.None);
Assert.Equal(minVersion, table.Version());
for (var version = minVersion; version <= expectedVersion; version++)
{
await table.UpdateIncrementalAsync(version, CancellationToken.None);
Assert.Equal(version, table.Version());
}
}

[Theory]
[InlineData(TableIdentifier.Checkpoints, 1, 12)]
[InlineData(TableIdentifier.CheckpointsVacuumed, 5, 12)]
[InlineData(TableIdentifier.Delta020, 1, 3)]
public async Task Table_Load_Update_Incremental_Latest(TableIdentifier identifier, int minVersion, int expectedVersion)
{
var location = identifier.TablePath();
using var runtime = new DeltaRuntime(RuntimeOptions.Default);
using var table = await DeltaTable.LoadAsync(runtime, location, new TableOptions
{
Version = minVersion,
},
CancellationToken.None);
Assert.Equal(minVersion, table.Version());
await table.UpdateIncrementalAsync(100, CancellationToken.None);
Assert.Equal(expectedVersion, table.Version());
}


[Fact]
public async Task Table_Load_Invalid_Uri_Type_Test()
Expand Down
23 changes: 23 additions & 0 deletions tests/DeltaLake.Tests/Table/MergeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,29 @@ UPDATE SET
});
}

[Fact]
public async Task Merge_Zero_Record_Count_Test()
{
var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0);
using var runtime = tableParts.runtime;
using var table = tableParts.table;
var version = table.Version();
await table.MergeAsync(@"MERGE INTO mytable USING newdata
ON newdata.test = mytable.test
WHEN NOT MATCHED BY TARGET
THEN INSERT (
test,
second,
third
)
VALUES (
newdata.test,
'inserted data',
99
)", [], table.Schema(), CancellationToken.None);
Assert.Equal(version, table.Version());
}

private async Task BaseMergeTest(string query, Action<IReadOnlyList<RecordBatch>> assertions)
{
var pair = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 10);
Expand Down
3 changes: 3 additions & 0 deletions tests/DeltaLake.Tests/Table/TableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public async Task Create_InMemory_Test()
var returnedSchema = table.Schema();
Assert.NotNull(returnedSchema);
Assert.Equal(schema.FieldsList.Count, returnedSchema.FieldsList.Count);
var protocol = table.ProtocolVersions();
Assert.True(protocol.MinimumReaderVersion > 0);
Assert.True(protocol.MinimumWriterVersion > 0);
}

[Fact]
Expand Down

0 comments on commit 1a52a95

Please sign in to comment.