Skip to content

Commit

Permalink
Introduce SyncToAsyncShim
Browse files Browse the repository at this point in the history
  • Loading branch information
mdrakiburrahman committed Oct 24, 2024
1 parent 6ecd720 commit a04e3e4
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 18 deletions.
4 changes: 2 additions & 2 deletions examples/local/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public static async Task Main(string[] args)

Console.WriteLine($"Table version after transaction: {table.Version()}");

Apache.Arrow.Table readTable = table.ReadAsArrowTable();
Apache.Arrow.Table readTable = await table.ReadAsArrowTableAsync(CancellationToken.None);
Console.WriteLine(readTable.ToString());

DataFrame df = table.ReadAsDataFrame();
DataFrame df = await table.ReadAsDataFrameAsync(CancellationToken.None);
Console.WriteLine(df.ToMarkdown());
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/DeltaLake/Interfaces/ITable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,17 @@ public interface ITable : IDisposable
/// <summary>
/// Read the delta table and return as <see cref="Apache.Arrow.Table"/>.
/// </summary>
Apache.Arrow.Table ReadAsArrowTable();
/// <param name="cancellationToken">A <see cref="System.Threading.CancellationToken">cancellation token</see>.</param>
Task<Apache.Arrow.Table> ReadAsArrowTableAsync(CancellationToken cancellationToken);

/// <summary>
/// Read the delta table and return as a <see cref="DataFrame"/>.
/// </summary>
/// <param name="cancellationToken">A <see cref="System.Threading.CancellationToken">cancellation token</see>.</param>
/// <remarks>
/// This loads the entire table into memory.
/// </remarks>
DataFrame ReadAsDataFrame();
Task<DataFrame> ReadAsDataFrameAsync(CancellationToken cancellationToken);

#endregion Read Operations

Expand Down
39 changes: 29 additions & 10 deletions src/DeltaLake/Kernel/Core/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using DeltaLake.Kernel.Callbacks.Allocators;
using DeltaLake.Kernel.Callbacks.Errors;
using DeltaLake.Kernel.Interop;
using DeltaLake.Kernel.Shim.Async;
using DeltaLake.Kernel.State;
using DeltaLake.Table;
using Microsoft.Data.Analysis;
Expand Down Expand Up @@ -187,26 +188,44 @@ internal unsafe Table(

#region Delta Kernel table operations

internal Apache.Arrow.Table ReadAsArrowTable()
internal async Task<Apache.Arrow.Table> ReadAsArrowTableAsync(
ICancellationToken cancellationToken
)
{
this.ThrowIfKernelNotSupported();

unsafe
{
return this.state.ArrowContext(true)->ToTable();
}
return await SyncToAsyncShim
.ExecuteAsync(
() =>
{
unsafe
{
return this.state.ArrowContext(true)->ToTable();
}
},
cancellationToken
)
.ConfigureAwait(false);
}

internal DataFrame ReadAsDataFrame()
internal async Task<DataFrame> ReadAsDataFrameAsync(ICancellationToken cancellationToken)
{
this.ThrowIfKernelNotSupported();

unsafe
{
return await SyncToAsyncShim
.ExecuteAsync(
() =>
{
unsafe
{
#pragma warning disable CA2000 // DataFrames use the RecordBatch, so we don't need to dispose of it
return DataFrame.FromArrowRecordBatch(this.state.ArrowContext(true)->ToRecordBatch());
return DataFrame.FromArrowRecordBatch(this.state.ArrowContext(true)->ToRecordBatch());
#pragma warning restore CA2000
}
}
},
cancellationToken
)
.ConfigureAwait(false);
}

internal override long Version()
Expand Down
61 changes: 61 additions & 0 deletions src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// -----------------------------------------------------------------------------
// <summary>
// A shim for turning synchronous operations to be asynchronous and cancellable.
// </summary>
//
// <copyright company="The Delta Lake Project Authors">
// Copyright (2024) The Delta Lake Project Authors. All rights reserved.
// Licensed under the Apache license. See LICENSE file in the project root for full license information.
// </copyright>
// -----------------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;

namespace DeltaLake.Kernel.Shim.Async
{
/// <summary>
/// Uses <see cref="TaskCompletionSource"/> to shim async operations.
/// </summary>
internal static class SyncToAsyncShim
{
/// <summary>
/// Converts a synchronous operation to a cancellable asynchronous
/// operation.
/// </summary>
/// <param name="action">Action to invoke.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <typeparam name="T">Type of the result.</typeparam>
internal static async Task<T> ExecuteAsync<T>(
Func<T> action,
CancellationToken cancellationToken
)
{
var tsc = new TaskCompletionSource<T>();

_ = Task.Run(
() =>
{
try
{
if (cancellationToken.IsCancellationRequested)
{
tsc.TrySetCanceled(cancellationToken);
return;
}
tsc.TrySetResult(action());
}
catch (Exception ex)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (ubuntu-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (ubuntu-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (ubuntu-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (ubuntu-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (macos-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (macos-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (macos-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (macos-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (windows-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (windows-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (windows-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (windows-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (windows-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)

Check warning on line 50 in src/DeltaLake/Kernel/Shim/Async/SyncToAsyncShim.cs

View workflow job for this annotation

GitHub Actions / build-lint-test (windows-latest)

Modify 'ExecuteAsync' to catch a more specific allowed exception type, or rethrow the exception (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1031)
{
tsc.TrySetException(ex);
}
},
cancellationToken
);

return await tsc.Task.ConfigureAwait(false);
}
}
}
7 changes: 5 additions & 2 deletions src/DeltaLake/Table/DeltaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ [EnumeratorCancellation] CancellationToken cancellationToken
}

/// <inheritdoc/>
public Apache.Arrow.Table ReadAsArrowTable() => this.table.ReadAsArrowTable();
public async Task<Apache.Arrow.Table> ReadAsArrowTableAsync(
CancellationToken cancellationToken
) => await this.table.ReadAsArrowTableAsync(cancellationToken).ConfigureAwait(false);

/// <inheritdoc/>
public DataFrame ReadAsDataFrame() => this.table.ReadAsDataFrame();
public async Task<DataFrame> ReadAsDataFrameAsync(CancellationToken cancellationToken) =>
await this.table.ReadAsDataFrameAsync(cancellationToken).ConfigureAwait(false);

/// <inheritdoc/>
public async Task InsertAsync(
Expand Down
4 changes: 2 additions & 2 deletions tests/DeltaLake.Tests/Table/KernelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ await policy.ExecuteAsync(async () =>
{
// Exercise: Reads via Kernel
//
Apache.Arrow.Table arrowTable = threadIsolatedTable.ReadAsArrowTable();
DataFrame dataFrame = threadIsolatedTable.ReadAsDataFrame();
Apache.Arrow.Table arrowTable = await threadIsolatedTable.ReadAsArrowTableAsync(default);
DataFrame dataFrame = await threadIsolatedTable.ReadAsDataFrameAsync(default);
string stringResult = dataFrame.ToMarkdown();
// Validate: Data Integrity
Expand Down

0 comments on commit a04e3e4

Please sign in to comment.