From 1a52a95365ff805fc905f13f6efe9218cd8d9540 Mon Sep 17 00:00:00 2001 From: KyJah Keys Date: Fri, 16 Feb 2024 09:23:56 -0500 Subject: [PATCH] added more tests --- src/DeltaLake/Bridge/Interop/Interop.cs | 4 +- src/DeltaLake/Bridge/Table.cs | 8 +++- .../Bridge/include/delta-lake-bridge.h | 1 + src/DeltaLake/Bridge/src/table.rs | 9 ++++- src/DeltaLake/Table/DeltaTable.cs | 7 ++-- .../DeltaLake.Tests/Table/ConstraintTests.cs | 29 ++++++++++++++ tests/DeltaLake.Tests/Table/InsertTests.cs | 11 ++++++ tests/DeltaLake.Tests/Table/LoadTests.cs | 39 +++++++++++++++++++ tests/DeltaLake.Tests/Table/MergeTests.cs | 23 +++++++++++ tests/DeltaLake.Tests/Table/TableTests.cs | 3 ++ 10 files changed, 127 insertions(+), 7 deletions(-) diff --git a/src/DeltaLake/Bridge/Interop/Interop.cs b/src/DeltaLake/Bridge/Interop/Interop.cs index 9dd2938..86604a1 100644 --- a/src/DeltaLake/Bridge/Interop/Interop.cs +++ b/src/DeltaLake/Bridge/Interop/Interop.cs @@ -386,7 +386,7 @@ internal static unsafe partial class Methods public static extern void history([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("uintptr_t")] UIntPtr limit, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("GenericErrorCallback")] IntPtr callback); [DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] - public static extern void table_update_incremental([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback); + public static extern void table_update_incremental([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("int64_t")] long max_version, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback); [DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] public static extern void table_load_version([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("int64_t")] long version, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback); @@ -396,7 +396,7 @@ internal static unsafe partial class Methods public static extern byte table_load_with_datetime([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("int64_t")] long ts_milliseconds, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback); [DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] - public static extern void table_merge([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* delta_table, [NativeTypeName("struct ByteArrayRef * _Nonnull")] ByteArrayRef* query, void* stream, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("GenericErrorCallback")] IntPtr callback); + public static extern void table_merge([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* delta_table, [NativeTypeName("struct ByteArrayRef * _Nonnull")] ByteArrayRef* query, [NativeTypeName("void * _Nonnull")] void* stream, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("GenericErrorCallback")] IntPtr callback); [DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] [return: NativeTypeName("struct ProtocolResponse")] diff --git a/src/DeltaLake/Bridge/Table.cs b/src/DeltaLake/Bridge/Table.cs index b09fb41..12824fd 100644 --- a/src/DeltaLake/Bridge/Table.cs +++ b/src/DeltaLake/Bridge/Table.cs @@ -474,6 +474,11 @@ public async Task HistoryAsync(ulong limit, ICancellationToken cancellat public async Task AddConstraintAsync(IReadOnlyDictionary constraints, IReadOnlyDictionary? customMetadata, ICancellationToken cancellationToken) { + if (constraints.Count == 0) + { + return; + } + var tsc = new TaskCompletionSource(); using (var scope = new Scope()) { @@ -506,7 +511,7 @@ public async Task AddConstraintAsync(IReadOnlyDictionary constra } } - public async Task UpdateIncrementalAsync(ICancellationToken cancellationToken) + public async Task UpdateIncrementalAsync(long? maxVersion, ICancellationToken cancellationToken) { var tsc = new TaskCompletionSource(); using (var scope = new Scope()) @@ -516,6 +521,7 @@ public async Task UpdateIncrementalAsync(ICancellationToken cancellationToken) Methods.table_update_incremental( _runtime.Ptr, _ptr, + maxVersion ?? -1L, scope.CancellationToken(cancellationToken), scope.FunctionPointer((fail) => { diff --git a/src/DeltaLake/Bridge/include/delta-lake-bridge.h b/src/DeltaLake/Bridge/include/delta-lake-bridge.h index 9924c0c..de1a650 100644 --- a/src/DeltaLake/Bridge/include/delta-lake-bridge.h +++ b/src/DeltaLake/Bridge/include/delta-lake-bridge.h @@ -302,6 +302,7 @@ void history(struct Runtime *_Nonnull runtime, void table_update_incremental(struct Runtime *_Nonnull runtime, struct RawDeltaTable *_Nonnull table, + int64_t max_version, const struct CancellationToken *cancellation_token, TableEmptyCallback callback); diff --git a/src/DeltaLake/Bridge/src/table.rs b/src/DeltaLake/Bridge/src/table.rs index 4df4a94..837e620 100644 --- a/src/DeltaLake/Bridge/src/table.rs +++ b/src/DeltaLake/Bridge/src/table.rs @@ -18,6 +18,7 @@ use deltalake::{ dataframe::DataFrame, datasource::{MemTable, TableProvider}, execution::context::{SQLOptions, SessionContext}, + logical_expr::max, }, kernel::StructType, operations::{ @@ -546,9 +547,15 @@ pub extern "C" fn history( pub extern "C" fn table_update_incremental( mut runtime: NonNull, mut table: NonNull, + max_version: i64, cancellation_token: Option<&CancellationToken>, callback: TableEmptyCallback, ) { + let max_version = if max_version > 0 { + Some(max_version) + } else { + None + }; run_async_with_cancellation!( runtime, table, @@ -556,7 +563,7 @@ pub extern "C" fn table_update_incremental( rt, tbl, { - match tbl.table.update_incremental(None).await { + match tbl.table.update_incremental(max_version).await { Ok(_) => unsafe { callback(std::ptr::null()); }, diff --git a/src/DeltaLake/Table/DeltaTable.cs b/src/DeltaLake/Table/DeltaTable.cs index 56f9398..562c760 100644 --- a/src/DeltaLake/Table/DeltaTable.cs +++ b/src/DeltaLake/Table/DeltaTable.cs @@ -186,13 +186,14 @@ public Task AddConstraintsAsync( } /// - /// Updates table to latest version + /// Updates table to specific or latest version /// + /// Optional maximum version /// /// - public Task UpdateIncrementalAsync(CancellationToken cancellationToken) + public Task UpdateIncrementalAsync(long? maxVersion, CancellationToken cancellationToken) { - return _table.UpdateIncrementalAsync(cancellationToken); + return _table.UpdateIncrementalAsync(maxVersion, cancellationToken); } /// diff --git a/tests/DeltaLake.Tests/Table/ConstraintTests.cs b/tests/DeltaLake.Tests/Table/ConstraintTests.cs index c79a51a..e828dd4 100644 --- a/tests/DeltaLake.Tests/Table/ConstraintTests.cs +++ b/tests/DeltaLake.Tests/Table/ConstraintTests.cs @@ -40,6 +40,35 @@ await table.AddConstraintsAsync( } } + [Fact] + public async Task Invalid_Constraint_Test() + { + var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0); + using var runtime = tableParts.runtime; + using var table = tableParts.table; + await Assert.ThrowsAsync(() => table.AddConstraintsAsync( + new Dictionary + { + ["something isn't right"] = "invalid constraint", + }, + new Dictionary(), + CancellationToken.None)); + } + + [Fact] + public async Task Empty_Constraint_Test() + { + var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0); + using var runtime = tableParts.runtime; + using var table = tableParts.table; + var version = table.Version(); + await table.AddConstraintsAsync( + new Dictionary(), + new Dictionary(), + CancellationToken.None); + Assert.Equal(version, table.Version()); + } + public static IEnumerable TestCases() { yield return [1, "hello", 0, new Dictionary { ["first"] = "first > 0" }, false]; diff --git a/tests/DeltaLake.Tests/Table/InsertTests.cs b/tests/DeltaLake.Tests/Table/InsertTests.cs index aad096e..da50893 100644 --- a/tests/DeltaLake.Tests/Table/InsertTests.cs +++ b/tests/DeltaLake.Tests/Table/InsertTests.cs @@ -31,6 +31,17 @@ public async Task File_System_Insert_Variable_Record_Count_Test(int length) } } + [Fact] + public async Task Memory_Insert_Zero_Record_Count_Test() + { + var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0); + using var runtime = tableParts.runtime; + using var table = tableParts.table; + var version = table.Version(); + await table.InsertAsync([], table.Schema(), new InsertOptions(), CancellationToken.None); + Assert.Equal(version, table.Version()); + } + private async Task BaseInsertTest(string path, int length) { var data = await TableHelpers.SetupTable(path, length); diff --git a/tests/DeltaLake.Tests/Table/LoadTests.cs b/tests/DeltaLake.Tests/Table/LoadTests.cs index 6a703cd..adaf361 100644 --- a/tests/DeltaLake.Tests/Table/LoadTests.cs +++ b/tests/DeltaLake.Tests/Table/LoadTests.cs @@ -74,6 +74,45 @@ public async Task Table_Load_Latest(TableIdentifier identifier, int minVersion, Assert.Equal(expectedVersion, table.Version()); } + [Theory] + [InlineData(TableIdentifier.Checkpoints, 1, 12)] + [InlineData(TableIdentifier.CheckpointsVacuumed, 5, 12)] + [InlineData(TableIdentifier.Delta020, 1, 3)] + public async Task Table_Load_Update_Incremental(TableIdentifier identifier, int minVersion, int expectedVersion) + { + var location = identifier.TablePath(); + using var runtime = new DeltaRuntime(RuntimeOptions.Default); + using var table = await DeltaTable.LoadAsync(runtime, location, new TableOptions + { + Version = minVersion, + }, + CancellationToken.None); + Assert.Equal(minVersion, table.Version()); + for (var version = minVersion; version <= expectedVersion; version++) + { + await table.UpdateIncrementalAsync(version, CancellationToken.None); + Assert.Equal(version, table.Version()); + } + } + + [Theory] + [InlineData(TableIdentifier.Checkpoints, 1, 12)] + [InlineData(TableIdentifier.CheckpointsVacuumed, 5, 12)] + [InlineData(TableIdentifier.Delta020, 1, 3)] + public async Task Table_Load_Update_Incremental_Latest(TableIdentifier identifier, int minVersion, int expectedVersion) + { + var location = identifier.TablePath(); + using var runtime = new DeltaRuntime(RuntimeOptions.Default); + using var table = await DeltaTable.LoadAsync(runtime, location, new TableOptions + { + Version = minVersion, + }, + CancellationToken.None); + Assert.Equal(minVersion, table.Version()); + await table.UpdateIncrementalAsync(100, CancellationToken.None); + Assert.Equal(expectedVersion, table.Version()); + } + [Fact] public async Task Table_Load_Invalid_Uri_Type_Test() diff --git a/tests/DeltaLake.Tests/Table/MergeTests.cs b/tests/DeltaLake.Tests/Table/MergeTests.cs index 8deafb9..07083a8 100644 --- a/tests/DeltaLake.Tests/Table/MergeTests.cs +++ b/tests/DeltaLake.Tests/Table/MergeTests.cs @@ -138,6 +138,29 @@ UPDATE SET }); } + [Fact] + public async Task Merge_Zero_Record_Count_Test() + { + var tableParts = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 0); + using var runtime = tableParts.runtime; + using var table = tableParts.table; + var version = table.Version(); + await table.MergeAsync(@"MERGE INTO mytable USING newdata + ON newdata.test = mytable.test + WHEN NOT MATCHED BY TARGET + THEN INSERT ( + test, + second, + third + ) + VALUES ( + newdata.test, + 'inserted data', + 99 + )", [], table.Schema(), CancellationToken.None); + Assert.Equal(version, table.Version()); + } + private async Task BaseMergeTest(string query, Action> assertions) { var pair = await TableHelpers.SetupTable($"memory://{Guid.NewGuid():N}", 10); diff --git a/tests/DeltaLake.Tests/Table/TableTests.cs b/tests/DeltaLake.Tests/Table/TableTests.cs index feb3fd6..d460737 100644 --- a/tests/DeltaLake.Tests/Table/TableTests.cs +++ b/tests/DeltaLake.Tests/Table/TableTests.cs @@ -44,6 +44,9 @@ public async Task Create_InMemory_Test() var returnedSchema = table.Schema(); Assert.NotNull(returnedSchema); Assert.Equal(schema.FieldsList.Count, returnedSchema.FieldsList.Count); + var protocol = table.ProtocolVersions(); + Assert.True(protocol.MinimumReaderVersion > 0); + Assert.True(protocol.MinimumWriterVersion > 0); } [Fact]