Skip to content

Commit

Permalink
Fix most formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jul 29, 2023
1 parent 5081171 commit f250bab
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
6 changes: 5 additions & 1 deletion py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3315,7 +3315,11 @@ def write_delta(
... ) # doctest: +SKIP
"""
from polars.io.delta import _check_if_delta_available, _resolve_delta_lake_uri, _create_delta_compatible_schema
from polars.io.delta import (
_check_if_delta_available,
_create_delta_compatible_schema,
_resolve_delta_lake_uri,
)

_check_if_delta_available()

Expand Down
75 changes: 47 additions & 28 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from __future__ import annotations

import pyarrow as pa
from functools import reduce
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
from functools import reduce

import pyarrow as pa

from polars.convert import from_arrow
from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake
from polars.io.pyarrow_dataset import scan_pyarrow_dataset


if TYPE_CHECKING:
from polars import DataFrame, LazyFrame

Expand Down Expand Up @@ -319,48 +319,62 @@ def _check_if_delta_available() -> None:


def _reconstruct_field_type(
field: pa.Field, field_head: pa.Field, reconstructed_field=None
field: pa.Field,
field_head: pa.Field,
reconstructed_field: list | None = None,
) -> pa.Field:
"""Recursive function that traverses through pyArrow fields to change timestamps to US and reconstructing it later.
"""
Recursive function that traverses through pyArrow fields.
Args:
field (pa.Field): _description_
field_head (pa.Field): field head to retain column names
reconstructed_field (List, optional): . Defaults to None.
Parameters
----------
field : pa.Field
field to analyze
field_head : pa.Field
field head to retain column names
reconstructed_field : List, optional
Collection of dtypes to reconstruct a field, by default None
Returns:
pa.Field: Reconstructed pyArrow field
Returns
-------
pa.Field
Reconstructed field
"""
if isinstance(field.type, pa.TimestampType):
if reconstructed_field is None:
return pa.field(
name=field.name,
type=pa.timestamp("us"
# , tz=field.type.tz
),
type=pa.timestamp(
"us"
# , tz=field.type.tz
),
)
else:
reconstructed_field.append(pa.timestamp("us",
# tz=field.type.tz
))
reconstructed_field.append(
pa.timestamp(
"us",
# tz=field.type.tz
)
)
return pa.field(
name=field_head.name,
type=reduce(lambda x, y: y(x), reversed(reconstructed_field)),
)
elif 'uint' in str(field.type) and not any(isinstance(field.type, dtype) for dtype in [pa.StructType, pa.LargeListType]):
elif "uint" in str(field.type) and not any(
isinstance(field.type, dtype) for dtype in [pa.StructType, pa.LargeListType]
):
if reconstructed_field is None:
return pa.field(
name=field.name,
type=getattr(pa, str(field.type).strip('u'))()
name=field.name, type=getattr(pa, str(field.type).strip("u"))()
)
else:
reconstructed_field.append(getattr(pa, str(field.type).strip('u'))())
reconstructed_field.append(getattr(pa, str(field.type).strip("u"))())
return pa.field(
name=field_head.name,
type=reduce(lambda x, y: y(x), reversed(reconstructed_field)),
)
elif isinstance(field.type, pa.LargeListType) and (
"timestamp" in str(field.type) or 'uint' in str(field.type)
"timestamp" in str(field.type) or "uint" in str(field.type)
): # some checks to just skip traversing non timestamps and US timestamps
if reconstructed_field is None:
reconstructed_field = [pa.large_list]
Expand Down Expand Up @@ -391,15 +405,20 @@ def _reconstruct_field_type(


def _create_delta_compatible_schema(schema: pa.schema) -> pa.Schema:
"""Makes the dataframe schema compatible with Delta
"""
Makes the dataframe schema compatible with Delta lake protocol.
Args:
schema (pa.schema): input schema
Parameters
----------
schema : pa.schema
input schema
Returns:
pa.Schema: delta compatible schema
Returns
-------
pa.Schema
delta compatible schema
"""
schema_out = [*map(_reconstruct_field_type, schema, schema)]
schema = pa.schema(schema_out, metadata=schema.metadata)

return schema
return schema

0 comments on commit f250bab

Please sign in to comment.