Skip to content

Commit

Permalink
allow append-only tables in deltalake not to have primary key (#8229)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: fe0625b9b357a93c293145cf9553a3e80e66a591
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Feb 17, 2025
1 parent 1be2234 commit 03d1d30
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- **BREAKING**: Changed the interface of `LLMReranker`, the `use_logit_bias`, `cache_strategy`, `retry_strategy` and `kwargs` arguments are no longer supported.
- **BREAKING**: LLMReranker no longer inherits from pw.UDF
- **BREAKING**: `pw.stdlib.utils.AsyncTransformer.output_table` now returns a table with columns with Future data type.
- `pw.io.deltalake.read` can now read append-only tables without requiring explicit specification of primary key fields.

## [0.18.0] - 2025-02-07

Expand Down
11 changes: 3 additions & 8 deletions python/pathway/io/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def read(
Reads a table from Delta Lake. Currently, local and S3 lakes are supported. The table
doesn't have to be append only, however, the deletion vectors are not supported yet.
Note that the connector requires primary key fields to be specified in the schema.
You can specify the fields to be used in the primary key with ``pw.column_definition``
function.
Note that the connector requires either the table to be append-only or the primary key
fields to be specified in the schema. You can define the primary key fields using the
``pw.column_definition`` function.
Args:
uri: URI of the Delta Lake source that must be read.
Expand Down Expand Up @@ -136,11 +136,6 @@ def read(
are provided but the path starts with ``s3://`` or ``s3a://``, Pathway will use the
credentials of the currently authenticated user.
"""
if schema.primary_key_columns() is None:
raise ValueError(
"DeltaLake reader requires explicit primary key fields specification"
)

_check_entitlements("deltalake")
prepared_connection_settings = _prepare_s3_connection_settings(
s3_connection_settings
Expand Down
22 changes: 17 additions & 5 deletions python/pathway/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3280,7 +3280,8 @@ def test_airbyte_local_docker_run(env_vars, tmp_path_with_airbyte_config):


@only_with_license_key
def test_deltalake_roundtrip(tmp_path: pathlib.Path):
@pytest.mark.parametrize("has_primary_key", [True, False])
def test_deltalake_roundtrip(has_primary_key: bool, tmp_path: pathlib.Path):
data = """
k | v
1 | foo
Expand All @@ -3293,7 +3294,7 @@ def test_deltalake_roundtrip(tmp_path: pathlib.Path):
write_csv(input_path, data)

class InputSchema(pw.Schema):
k: int = pw.column_definition(primary_key=True)
k: int = pw.column_definition(primary_key=has_primary_key)
v: str

table = pw.io.csv.read(str(input_path), schema=InputSchema, mode="static")
Expand Down Expand Up @@ -3550,15 +3551,26 @@ class InputSchema(pw.Schema):

@only_with_license_key
def test_deltalake_no_primary_key(tmp_path: pathlib.Path):
data = [{"k": 1, "v": "one"}, {"k": 2, "v": "two"}, {"k": 3, "v": "three"}]
df = pd.DataFrame(data).set_index("k")
lake_path = str(tmp_path / "lake")
output_path = str(tmp_path / "output.csv")
write_deltalake(lake_path, df)

class InputSchema(pw.Schema):
k: int
v: str

table = pw.io.deltalake.read(lake_path, schema=InputSchema)
pw.io.jsonlines.write(table, output_path)
with pytest.raises(
ValueError,
match="DeltaLake reader requires explicit primary key fields specification",
OSError,
match=(
"Failed to connect to DeltaLake: explicit primary key specification is "
"required for non-append-only tables"
),
):
pw.io.deltalake.read(tmp_path / "lake", schema=InputSchema)
pw.run()


def test_iceberg_no_primary_key():
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/data_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ pub struct DsvParser {
// more ways to represent a boolean value in the string.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
enum AdvancedBoolParseError {
pub enum AdvancedBoolParseError {
#[error("provided string was not parsable as a boolean value")]
StringNotParsable,
}
Expand All @@ -513,7 +513,7 @@ enum AdvancedBoolParseError {
/// Related doc: `https://www.postgresql.org/docs/16/datatype-boolean.html`
///
/// We also support "t", "f", "y", "n" as single-letter prefixes
fn parse_bool_advanced(raw_value: &str) -> Result<bool, AdvancedBoolParseError> {
pub fn parse_bool_advanced(raw_value: &str) -> Result<bool, AdvancedBoolParseError> {
let raw_value_lowercase = raw_value.trim().to_ascii_lowercase();
match raw_value_lowercase.as_str() {
"true" | "yes" | "on" | "1" | "t" | "y" => Ok(true),
Expand Down
15 changes: 15 additions & 0 deletions src/connectors/data_lake/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{
parquet_row_into_values_map, LakeBatchWriter, LakeWriterSettings, SPECIAL_OUTPUT_FIELDS,
};
use crate::async_runtime::create_async_tokio_runtime;
use crate::connectors::data_format::parse_bool_advanced;
use crate::connectors::data_storage::ConnectorMode;
use crate::connectors::scanner::S3Scanner;
use crate::connectors::{
Expand Down Expand Up @@ -248,6 +249,7 @@ pub struct DeltaTableReader {
current_event_type: DataEventType,
}

const APPEND_ONLY_PROPERTY_NAME: &str = "delta.appendOnly";
const DELTA_LAKE_INITIAL_POLL_DURATION: Duration = Duration::from_millis(5);
const DELTA_LAKE_MAX_POLL_DURATION: Duration = Duration::from_millis(100);
const DELTA_LAKE_POLL_BACKOFF: u32 = 2;
Expand All @@ -260,10 +262,23 @@ impl DeltaTableReader {
column_types: HashMap<String, Type>,
streaming_mode: ConnectorMode,
start_from_timestamp_ms: Option<i64>,
has_primary_key: bool,
) -> Result<Self, ReadError> {
let runtime = create_async_tokio_runtime()?;
let mut table =
runtime.block_on(async { open_delta_table(path, storage_options).await })?;
let table_props = &table.metadata()?.configuration;
let append_only_property = table_props.get(APPEND_ONLY_PROPERTY_NAME);
let is_append_only = {
if let Some(Some(append_only_property)) = append_only_property {
parse_bool_advanced(append_only_property).unwrap_or(false)
} else {
false
}
};
if !has_primary_key && !is_append_only {
return Err(ReadError::PrimaryKeyRequired);
}
let mut current_version = table.version();
let mut parquet_files_queue = Self::get_reader_actions(&table, path)?;

Expand Down
3 changes: 3 additions & 0 deletions src/connectors/data_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ pub enum ReadError {

#[error("deletion vectors in delta tables are not supported")]
DeltaDeletionVectorsNotSupported,

#[error("explicit primary key specification is required for non-append-only tables")]
PrimaryKeyRequired,
}

#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
Expand Down
7 changes: 1 addition & 6 deletions src/python_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4857,12 +4857,6 @@ impl DataStorage {
data_format: &DataFormat,
license: Option<&License>,
) -> PyResult<(Box<dyn ReaderBuilder>, usize)> {
if data_format.key_field_names.is_none() {
return Err(PyValueError::new_err(
"DeltaLake reader requires explicit primary key fields specification",
));
}

if let Some(license) = license {
license.check_entitlements(["deltalake"])?;
}
Expand All @@ -4874,6 +4868,7 @@ impl DataStorage {
data_format.value_fields_type_map(py),
self.mode,
self.start_from_timestamp_ms,
data_format.key_field_names.is_some(),
)
.map_err(|e| PyIOError::new_err(format!("Failed to connect to DeltaLake: {e}")))?;
Ok((Box::new(reader), 1))
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ fn read_with_connector(path: &str, type_: &Type) -> Result<Vec<Value>> {
type_map,
ConnectorMode::Static,
None,
true,
)
.unwrap();
let parser =
Expand Down

0 comments on commit 03d1d30

Please sign in to comment.