Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Delta Kernel FFI support for read path and expose Apache.Arrow.Table and Microsoft.Data.Analysis.DataFrame read interface #89

Merged
merged 107 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
6541108
Added git submodule for delta-kernel-rs with version pinning
mdrakiburrahman Oct 1, 2024
eea3526
README updates for module init
mdrakiburrahman Oct 1, 2024
1ee5d1e
Added FFI generation README
mdrakiburrahman Oct 1, 2024
b224e0a
Pass in "cloud" feature
mdrakiburrahman Oct 1, 2024
c8e6d15
Updated README
mdrakiburrahman Oct 1, 2024
ddc5a1e
Generated CSharp classes in one file
mdrakiburrahman Oct 1, 2024
cfec531
Moved docs to CONTRIBUTING
mdrakiburrahman Oct 1, 2024
3b7a7c1
Update local test with temp changes, do not commit
mdrakiburrahman Oct 1, 2024
2210400
Interfaced the key implementations and ran unit tests green
mdrakiburrahman Oct 2, 2024
c63255a
Updated signature
mdrakiburrahman Oct 2, 2024
e7daf85
Prep delta-rs methods to be overrideable
mdrakiburrahman Oct 2, 2024
c96575f
Header
mdrakiburrahman Oct 2, 2024
7a01f02
Passing in TableStorageOptions
mdrakiburrahman Oct 2, 2024
789c9d3
Simplify Engine interface and pass in TableLocation into TableStorage…
mdrakiburrahman Oct 2, 2024
4251edf
Quick version test
mdrakiburrahman Oct 3, 2024
c567926
Added Version support via Kernel and ran all UTs
mdrakiburrahman Oct 3, 2024
56708d8
Added Uri implementation via Kernel
mdrakiburrahman Oct 3, 2024
94c3eba
Add PartitionColumns() override from Kernel
mdrakiburrahman Oct 4, 2024
ebf8edb
Docs
mdrakiburrahman Oct 4, 2024
ea98bf1
Moved PartitionList into managed state
mdrakiburrahman Oct 4, 2024
4ddc671
Update FFI gen
mdrakiburrahman Oct 5, 2024
408d15d
Remove unnecessary import
mdrakiburrahman Oct 5, 2024
5ee5ef7
Checkpoint: improved naming for pointer management, and some read sup…
mdrakiburrahman Oct 5, 2024
b38df01
Fixed problem with KernelState refreshing parent nodes
mdrakiburrahman Oct 5, 2024
262fc6b
Added Zero-copy from Parquet to Arrow
mdrakiburrahman Oct 5, 2024
dee0cdc
Working with String representation
mdrakiburrahman Oct 5, 2024
7dd9a9e
Converted entire table into DataFrame
mdrakiburrahman Oct 5, 2024
d566d3c
Added unit test for Kernel and caught Partition problem
mdrakiburrahman Oct 6, 2024
dbc945f
Make the test run parallel
mdrakiburrahman Oct 6, 2024
96e6e80
Proofread
mdrakiburrahman Oct 6, 2024
456e5bf
Proofread
mdrakiburrahman Oct 6, 2024
160a9b0
Proofread
mdrakiburrahman Oct 6, 2024
02f563a
Proofread
mdrakiburrahman Oct 6, 2024
36415dd
Merge remote-tracking branch 'origin/main' into dev/mdrrahman/kernel-…
mdrakiburrahman Oct 6, 2024
bc5fdfa
Proofread
mdrakiburrahman Oct 6, 2024
03ba177
Proofread
mdrakiburrahman Oct 6, 2024
de0fd31
Proofread
mdrakiburrahman Oct 6, 2024
95541f3
Proofread
mdrakiburrahman Oct 6, 2024
43114ff
Proofread
mdrakiburrahman Oct 6, 2024
cd31c37
Proofread
mdrakiburrahman Oct 6, 2024
d0ca4b4
Add docstring
mdrakiburrahman Oct 6, 2024
3f0afb2
Proofread
mdrakiburrahman Oct 6, 2024
6cb9fef
Proofread
mdrakiburrahman Oct 6, 2024
db53471
GetFunctionPointerForDelegate does not need to be disposed
mdrakiburrahman Oct 6, 2024
f10d0d3
Proofread
mdrakiburrahman Oct 6, 2024
332ef4b
Proofread
mdrakiburrahman Oct 6, 2024
8f448dc
Proofread
mdrakiburrahman Oct 6, 2024
2ebf2c3
Get rid of ArrowFfiSchemaConverter and simply cast FFI_ArrowSchema po…
mdrakiburrahman Oct 6, 2024
0e3134a
Update CSPROJ with Cargo
mdrakiburrahman Oct 6, 2024
721762a
Update test CSPROJ with binary copy
mdrakiburrahman Oct 6, 2024
0f3c545
Proofread
mdrakiburrahman Oct 6, 2024
e2efdc4
Fix build warnings
mdrakiburrahman Oct 6, 2024
a739fa6
Build: checkout specific branch of kernel
mdrakiburrahman Oct 6, 2024
3aeb94d
YAML
mdrakiburrahman Oct 6, 2024
a552f3e
YAML
mdrakiburrahman Oct 6, 2024
db85b6c
YAML
mdrakiburrahman Oct 6, 2024
83ead4f
YAML
mdrakiburrahman Oct 6, 2024
1830ffa
YAML
mdrakiburrahman Oct 6, 2024
791cb16
dotnet format
mdrakiburrahman Oct 6, 2024
1b68e79
Skip Nuget push if already exists
mdrakiburrahman Oct 6, 2024
7fe50e7
Fix tricky cargo FFI spinlock problem
mdrakiburrahman Oct 6, 2024
ef22eac
Missed one cargo command
mdrakiburrahman Oct 6, 2024
78e33f9
Update codeql with Git checkout
mdrakiburrahman Oct 6, 2024
48169a1
Add parallelized reads, that does not currently pass
mdrakiburrahman Oct 6, 2024
0236a92
Whitespace format
mdrakiburrahman Oct 6, 2024
ca998b5
Run nuget pipeline
mdrakiburrahman Oct 6, 2024
04b6510
YAML
mdrakiburrahman Oct 7, 2024
a580bd0
Use script for git checkout
mdrakiburrahman Oct 7, 2024
bf6db8e
chmod
mdrakiburrahman Oct 7, 2024
89f28a1
Try recursive chmod
mdrakiburrahman Oct 7, 2024
ab38782
Add OpenSSL for ubuntu-arm
mdrakiburrahman Oct 7, 2024
0b05e92
Attempt adding dep to openssl
mdrakiburrahman Oct 7, 2024
48fbc15
Adding OpenSSL to Cargo.toml did not work, revert
mdrakiburrahman Oct 7, 2024
3621208
Attempt adding Cross.toml
mdrakiburrahman Oct 7, 2024
d11f6e5
Add CROSS_CONFIG via TOML file
mdrakiburrahman Oct 7, 2024
8444c03
YAML
mdrakiburrahman Oct 7, 2024
45671ce
YAML
mdrakiburrahman Oct 7, 2024
b170ba4
YAML
mdrakiburrahman Oct 7, 2024
001569d
YAML
mdrakiburrahman Oct 7, 2024
ae617e8
Attempt
mdrakiburrahman Oct 7, 2024
e800b67
YAML
mdrakiburrahman Oct 7, 2024
389a3c1
YAML
mdrakiburrahman Oct 7, 2024
e03e549
YAML
mdrakiburrahman Oct 7, 2024
59b7fb5
YAML
mdrakiburrahman Oct 7, 2024
1f78dd6
Proofread
mdrakiburrahman Oct 7, 2024
edbbb7a
Proofread
mdrakiburrahman Oct 7, 2024
7cb1d38
Build warning
mdrakiburrahman Oct 7, 2024
cf969a6
CRLF -> LF changes
mdrakiburrahman Oct 7, 2024
17d4641
Unskip test - worked on Linux single threaded read
mdrakiburrahman Oct 7, 2024
5907432
Solved the concurrency problem (tested on Linux)
mdrakiburrahman Oct 7, 2024
6a50637
Add assertion for number of partitions
mdrakiburrahman Oct 7, 2024
b55a973
Concurrent read checkpoint
mdrakiburrahman Oct 7, 2024
d2aecf5
Moved ArrowContext into ManagedTableState and solved a bunch of point…
mdrakiburrahman Oct 8, 2024
e24fa48
Proofread
mdrakiburrahman Oct 8, 2024
968d53d
Proofread
mdrakiburrahman Oct 8, 2024
cbe3a44
PtrToStringAnsi -> PtrToStringUTF8
mdrakiburrahman Oct 8, 2024
aa9e205
Cast pointer to byte instead of char
mdrakiburrahman Oct 8, 2024
f25536c
Merge branch 'main' into dev/mdrrahman/kernel-ffi-submodule
mdrakiburrahman Oct 8, 2024
cb38beb
Dev env setup script
mdrakiburrahman Oct 8, 2024
51e344d
Add libssl-dev
mdrakiburrahman Oct 8, 2024
65d856c
Docs
mdrakiburrahman Oct 8, 2024
0376c69
Path missing "/" from test
mdrakiburrahman Oct 8, 2024
4811e6a
Add sample output
mdrakiburrahman Oct 8, 2024
efc8e95
StringToHGlobalAnsi -> StringToCoTaskMemUTF8, tested on Windows and L…
mdrakiburrahman Oct 8, 2024
b4d065d
Updated Architecture Diagram
mdrakiburrahman Oct 8, 2024
c48108a
Proofread
mdrakiburrahman Oct 8, 2024
d3561c9
Proofread
mdrakiburrahman Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 161 additions & 10 deletions src/DeltaLake/Kernel/Arrow/Extensions/ArrowContextExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Apache.Arrow;
using Apache.Arrow.C;
using Apache.Arrow.Types;
using DeltaLake.Kernel.Interop;
using DeltaLake.Kernel.State;

namespace DeltaLake.Kernel.Arrow.Extensions
Expand All @@ -21,29 +25,176 @@ namespace DeltaLake.Kernel.Arrow.Extensions
/// </summary>
internal static class ArrowContextExtensions
{

#region Extension Methods

/// <summary>
/// Converts an Arrow Context to an Arrow Table.
/// Converts an Arrow Context to <see cref="Apache.Arrow.Table"/>
/// </summary>
/// <param name="context">The Arrow Context to convert.</param>
/// <returns>The converted Arrow Table.</returns>
internal static unsafe Apache.Arrow.Table ToTable(this ArrowContext context)
{
if (context == null || context.Schema == null)
(Schema schema, List<RecordBatch> recordBatches) = context.ToSchematizedBatches();
return Apache.Arrow.Table.TableFromRecordBatches(schema, recordBatches);
}

/// <summary>
/// Converts an Arrow Context to a single <see cref="Apache.Arrow.RecordBatch"/>"/>
/// </summary>
/// <remarks>
/// Inspired from https://github.com/apache/arrow/issues/35371, this is
/// the only documented way to convert a list of <see cref="RecordBatch"/>es
/// reliably to a single <see cref="RecordBatch"/>.
/// </remarks>
/// <param name="context">The Arrow Context to convert.</param>
/// <returns>The converted Record Batch.</returns>
internal static unsafe RecordBatch ToRecordBatch(this ArrowContext context)
{
(Schema schema, List<RecordBatch> recordBatches) = context.ToSchematizedBatches();
List<IArrowArray> concatenatedColumns = new();

foreach (Field field in schema.FieldsList)
{
throw new ArgumentException($"Invalid ArrowContext provided for Table conversion: cannot convert to Table without schema.");
List<IArrowArray> columnArrays = new();
foreach (RecordBatch recordBatch in recordBatches)
{
IArrowArray column = recordBatch.Column(field.Name);
columnArrays.Add(column);
}
IArrowArray concatenatedColumn = ArrowArrayConcatenator.Concatenate(columnArrays);
concatenatedColumns.Add(concatenatedColumn);
}
return Apache.Arrow.Table.TableFromRecordBatches(context.Schema, context.ToRecordBatches());
return new RecordBatch(schema, concatenatedColumns, concatenatedColumns[0].Length);
}

internal static unsafe List<RecordBatch> ToRecordBatches(this ArrowContext context)
/// <summary>
/// Converts an Arrow Context to a list of <see cref="Apache.Arrow.RecordBatch"/>"/>
/// and <see cref="Apache.Arrow.Schema"/>.
/// </summary>
/// <param name="context">The Arrow Context to convert.</param>
/// <returns>The converted tuple of Arrow Schema and Record Batches.</returns>
internal static (Schema, List<RecordBatch>) ToSchematizedBatches(this ArrowContext context)
{
if (context == null || context.NumBatches == 0 || context.Batches == null)
context.ValidateContext();

List<RecordBatch> recordBatches = new(context.NumBatches);
List<Schema> schemas = new(context.NumBatches);

for (int i = 0; i < context.NumBatches; i++)
{
throw new ArgumentException($"Invalid ArrowContext provided for RecordBatch conversion: contains one or more null pointers with {context?.NumBatches} Record Batches.");
unsafe
{
ArrowFFIData* arrowStructPtr = context.ArrowStructs[i];
ParquetStringPartitions* partitionsPtr = context.Partitions[i];

Schema schemaWithoutPartition = CArrowSchemaImporter.ImportSchema((CArrowSchema*)&arrowStructPtr->schema);
RecordBatch batchWithoutPartition = CArrowArrayImporter.ImportRecordBatch((CArrowArray*)&arrowStructPtr->array, schemaWithoutPartition);

Schema schemaWithPartition = AddPartitionColumnsToSchema(schemaWithoutPartition, partitionsPtr);
RecordBatch batchWithPartition = AddPartitionColumnsToRecordBatch(batchWithoutPartition, partitionsPtr);

recordBatches.Add(batchWithPartition);
schemas.Add(schemaWithPartition);
}
}
List<RecordBatch> recordBatches = new(context.NumBatches);
for (int i = 0; i < context.NumBatches; i++) recordBatches.Add(*context.Batches[i]);
return recordBatches;

for (int i = 1; i < schemas.Count; i++)
{
if (schemas[i] == schemas[0]) throw new InvalidOperationException($"All schemas must be the same in - got {i}th {schemas[i]} != 0th {schemas[0]}");
}

return (schemas[0], recordBatches);
}

#endregion Extension Methods

#region Private Methods

private static void ValidateContext(this ArrowContext context)
{
if (context.NumBatches == 0)
{
throw new InvalidOperationException("Arrow Context must contain at least one RecordBatch");
}
}

#pragma warning disable CA1859, IDE0060 // Although we're not using partitionValue right now, it will be used when Kernel supports reporting Arrow Schema
private static unsafe IArrowType DeterminePartitionColumnType(string colName, string colValue)
{
// Currently, there's no way to determine the type of the partition,
// because the Kernel always represents partition values as strings in CStringMap.
//
// We have a request with Kernel team here to get back the Arrow Schema from
// the Delta Transaction Log:
//
// >>> https://delta-users.slack.com/archives/C04TRPG3LHZ/p1728178727958499
// >>> https://delta-users.slack.com/archives/C04TRPG3LHZ/p1728001059452499?thread_ts=1727999835.930339&cid=C04TRPG3LHZ
//
return StringType.Default;
}
#pragma warning restore CA1859, IDE0060

private static unsafe Schema AddPartitionColumnsToSchema(Apache.Arrow.Schema originalSchema, ParquetStringPartitions* partitionsPtr)
{
Apache.Arrow.Schema.Builder schemaBuilder = new();
foreach (Field field in originalSchema.FieldsList)
{
schemaBuilder = schemaBuilder.Field(field);
}

for (int i = 0; i < partitionsPtr->Len; i++)
{
#pragma warning disable CS8600, CS8604 // If Kernel sends us back null pointers, we are in trouble anyway
string colName = Marshal.PtrToStringAnsi((IntPtr)partitionsPtr->ColNames[i]);
string colValue = Marshal.PtrToStringAnsi((IntPtr)partitionsPtr->ColValues[i]);
mdrakiburrahman marked this conversation as resolved.
Show resolved Hide resolved
IArrowType dataType = DeterminePartitionColumnType(colName, colValue);
#pragma warning restore CS8600, CS8604

Field field = new(colName, dataType, nullable: true);
schemaBuilder = schemaBuilder.Field(field);
}

return schemaBuilder.Build();
}

private static unsafe RecordBatch AddPartitionColumnsToRecordBatch(RecordBatch recordBatch, ParquetStringPartitions* partitionsPtr)
{
Apache.Arrow.Schema.Builder schemaBuilder = new();
var columnList = new List<IArrowArray>();

// Append the original fieldList to the schema builder.
//
foreach (Field field in recordBatch.Schema.FieldsList) schemaBuilder = schemaBuilder.Field(field);

// Append the original columns to the column list.
//
for (int i = 0; i < recordBatch.Schema.FieldsList.Count; i++) columnList.Add(recordBatch.Column(i));

// Convert each of the partition column metadata structs into actual
// columns, then add it to the schema builder and column list.
//
for (int i = 0; i < partitionsPtr->Len; i++)
{
StringArray.Builder columnBuilder = new();

#pragma warning disable CS8600
string colName = Marshal.PtrToStringAnsi((IntPtr)partitionsPtr->ColNames[i]);
string colValue = Marshal.PtrToStringAnsi((IntPtr)partitionsPtr->ColValues[i]);
#pragma warning restore CS8600

Field field = new(colName, StringType.Default, nullable: true);
schemaBuilder = schemaBuilder.Field(field);

for (int j = 0; j < recordBatch.Length; j++)
{
columnBuilder = columnBuilder.Append(colValue ?? "");
}
columnList.Add(columnBuilder.Build());
}
return new RecordBatch(schemaBuilder.Build(), columnList, recordBatch.Length);
}

#endregion Private Methods
}
}
Loading
Loading