Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Polars a first class citizen for pulling and loading data to snowflake #295

Merged
merged 17 commits into from
Sep 5, 2024
Merged
19 changes: 14 additions & 5 deletions locopy/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,25 +188,26 @@ def column_names(self):
except Exception:
return [column[0].lower() for column in self.cursor.description]

def to_dataframe(self, size=None):
def to_dataframe(self, df_type="pandas", size=None):
"""Return a dataframe of the last query results.

This imports Pandas in here, so that it's not needed for other use cases. This is just a
convenience method.

Parameters
----------
df_type: Literal["pandas","polars"], optional
Output dataframe format. Defaults to pandas.

size : int, optional
Chunk size to fetch. Defaults to None.

Returns
-------
pandas.DataFrame
pandas.DataFrame or polars.DataFrame
Dataframe with lowercase column names. Returns None if no fetched
result.
"""
import pandas

columns = self.column_names()

if size is None:
Expand All @@ -220,7 +221,15 @@ def to_dataframe(self, size=None):

if len(fetched) == 0:
return None
return pandas.DataFrame(fetched, columns=columns)

if df_type == "pandas":
gladysteh99 marked this conversation as resolved.
Show resolved Hide resolved
import pandas
gladysteh99 marked this conversation as resolved.
Show resolved Hide resolved

return pandas.DataFrame(fetched, columns=columns)
elif df_type == "polars":
import polars

return polars.DataFrame(fetched, schema=columns, orient="row")

def to_dict(self):
"""Generate dictionaries of rows.
Expand Down
83 changes: 67 additions & 16 deletions locopy/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,14 +537,14 @@ def insert_dataframe_to_table(
verbose=False,
):
"""
Insert a Pandas dataframe to an existing table or a new table.
Insert a Pandas or Polars dataframe to an existing table or a new table.

`executemany` in psycopg2 and pg8000 has very poor performance in terms of running speed.
To overcome this issue, we instead format the insert query and then run `execute`.

Parameters
----------
dataframe: Pandas Dataframe
dataframe: pandas.DataFrame, polars.DataFrame or polars.LazyFrame
The pandas dataframe which needs to be inserted.

table_name: str
Expand All @@ -568,9 +568,14 @@ def insert_dataframe_to_table(

"""
import pandas as pd
import polars as pl
import polars.selectors as cs

if columns:
dataframe = dataframe[columns]
try:
dataframe = dataframe[columns]
except TypeError:
dataframe = dataframe.select(columns) # for polars lazyframe

all_columns = columns or list(dataframe.columns)
column_sql = "(" + ",".join(all_columns) + ")"
Expand Down Expand Up @@ -600,23 +605,69 @@ def insert_dataframe_to_table(
logger.info("New table has been created")

logger.info("Inserting records...")
for start in range(0, len(dataframe), batch_size):
try:
length = len(dataframe)
except TypeError:
length = dataframe.select(pl.len()).collect().item() # for polars lazyframe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here on LazyFrame. If we want to consider not supporting that.

Same comment here on considering dispatch helper method to get to_insert.


for start in range(0, length, batch_size):
# create a list of tuples for insert
to_insert = []
for row in dataframe[start : (start + batch_size)].itertuples(index=False):
none_row = (
"("
+ ", ".join(
[
"NULL"
if pd.isnull(val)
else "'" + str(val).replace("'", "''") + "'"
for val in row
]
if isinstance(dataframe, pd.DataFrame):
for row in dataframe[start : (start + batch_size)].itertuples(
index=False
):
none_row = (
"("
+ ", ".join(
[
"NULL"
if pd.isnull(val)
else "'" + str(val).replace("'", "''") + "'"
for val in row
]
)
+ ")"
)
+ ")"
to_insert.append(none_row)
elif isinstance(dataframe, pl.DataFrame):
dataframe = dataframe.with_columns(
dataframe.select(cs.numeric().fill_nan(None))
)
to_insert.append(none_row)
for row in dataframe[start : (start + batch_size)].iter_rows():
none_row = (
"("
+ ", ".join(
[
"NULL"
if val is None
else "'" + str(val).replace("'", "''") + "'"
for val in row
]
)
+ ")"
)
to_insert.append(none_row)
elif isinstance(dataframe, pl.LazyFrame):
for row in (
dataframe.slice(start, (start + batch_size))
.fill_nan(None)
.collect()
.iter_rows()
):
none_row = (
"("
+ ", ".join(
[
"NULL"
if val is None
else "'" + str(val).replace("'", "''") + "'"
for val in row
]
)
+ ")"
)
to_insert.append(none_row)
string_join = ", ".join(to_insert)
insert_query = (
f"""INSERT INTO {table_name} {column_sql} VALUES {string_join}"""
Expand Down
48 changes: 36 additions & 12 deletions locopy/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def unload(
def insert_dataframe_to_table(
self, dataframe, table_name, columns=None, create=False, metadata=None
):
"""Insert a Pandas dataframe to an existing table or a new table.
"""Insert a Pandas or Polars dataframe to an existing table or a new table.

In newer versions of the
python snowflake connector (v2.1.2+) users can call the ``write_pandas`` method from the cursor
Expand All @@ -408,8 +408,8 @@ def insert_dataframe_to_table(

Parameters
----------
dataframe: Pandas Dataframe
The pandas dataframe which needs to be inserted.
dataframe: Pandas or Polars Dataframe
The pandas or polars dataframe which needs to be inserted.

table_name: str
The name of the Snowflake table which is being inserted.
Expand All @@ -424,20 +424,36 @@ def insert_dataframe_to_table(
If metadata==None, it will be generated based on data
"""
import pandas as pd
import polars as pl
import polars.selectors as cs

if columns:
dataframe = dataframe[columns]
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the method, would it be possible to use a single dispatch helper here for the first part of this function? E.g. a dispatched function for line 431 to 456 - creating the to_insert. Once we have the list of tuples, the rest is the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is possible, my only concern was readability for future developers, but happy to refactor it to dispatched function if there is any significant upside

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more readable to use the isinstance checks below to select columns from dataframe. I don't think anything between requires this to be done first. That or doing the dispatch. Lmk if I'm wrong here. Like moving the dataframe=dataframe[columns] into the pandas branch and the .select(columns) into the polars branch.

Copy link
Contributor Author

@gladysteh99 gladysteh99 Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually polars dataframe can read dataframe[columns] as of version 1.0.0, the .select(columns) was intended for LazyFrame. Since we are not supporting LazyFrame now, it can be removed completely

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me

dataframe = dataframe[columns]
except TypeError:
dataframe = dataframe.select(columns)

all_columns = columns or list(dataframe.columns)
column_sql = "(" + ",".join(all_columns) + ")"
string_join = "(" + ",".join(["%s"] * len(all_columns)) + ")"

# create a list of tuples for insert
to_insert = []
for row in dataframe.itertuples(index=False):
none_row = tuple(None if pd.isnull(val) else str(val) for val in row)
to_insert.append(none_row)

if isinstance(dataframe, pd.DataFrame):
for row in dataframe.itertuples(index=False):
none_row = tuple(None if pd.isnull(val) else str(val) for val in row)
to_insert.append(none_row)
elif isinstance(dataframe, pl.DataFrame):
gladysteh99 marked this conversation as resolved.
Show resolved Hide resolved
dataframe = dataframe.with_columns(
dataframe.select(cs.numeric().fill_nan(None))
)
for row in dataframe.iter_rows():
none_row = tuple(None if val is None else str(val) for val in row)
to_insert.append(none_row)
elif isinstance(dataframe, pl.LazyFrame):
for row in dataframe.fill_nan(None).collect().iter_rows():
none_row = tuple(None if val is None else str(val) for val in row)
to_insert.append(none_row)
if not create and metadata:
logger.warning("Metadata will not be used because create is set to False.")

Expand Down Expand Up @@ -468,7 +484,7 @@ def insert_dataframe_to_table(
self.execute(insert_query, params=to_insert, many=True)
logger.info("Table insertion has completed")

def to_dataframe(self, size=None):
def to_dataframe(self, df_type="pandas", size=None):
"""Return a dataframe of the last query results.

This is just a convenience method. This
Expand All @@ -479,16 +495,24 @@ def to_dataframe(self, size=None):

Parameters
----------
df_type: Literal["pandas","polars"], optional
Output dataframe format. Defaults to pandas.

size : int, optional
Chunk size to fetch. Defaults to None.

Returns
-------
pandas.DataFrame
pandas.DataFrame or polars.DataFrame
Dataframe with lowercase column names. Returns None if no fetched
result.
"""
if size is None and self.cursor._query_result_format == "arrow":
return self.cursor.fetch_pandas_all()
if df_type == "pandas":
return self.cursor.fetch_pandas_all()
elif df_type == "polars":
import polars as pl
gladysteh99 marked this conversation as resolved.
Show resolved Hide resolved

return pl.from_arrow(self.cursor.fetch_arrow_all())
else:
return super().to_dataframe(size)
return super().to_dataframe(df_type=df_type, size=size)
91 changes: 89 additions & 2 deletions locopy/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
import sys
import threading
from collections import OrderedDict

# https://docs.python.org/3/library/functools.html#functools.singledispatch
jdawang marked this conversation as resolved.
Show resolved Hide resolved
from functools import singledispatch
from itertools import cycle
from typing import Union

import pandas as pd
import polars as pl
import yaml

from locopy.errors import (
Expand Down Expand Up @@ -253,7 +259,14 @@ def read_config_yaml(config_yaml):


# make it more granular, eg. include length
@singledispatch
def find_column_type(dataframe, warehouse_type: str):
"""Find data type of each column from the dataframe."""
pass


@find_column_type.register(pd.DataFrame)
def find_column_type_pandas(dataframe: pd.DataFrame, warehouse_type: str):
"""
Find data type of each column from the dataframe.

Expand Down Expand Up @@ -284,8 +297,6 @@ def find_column_type(dataframe, warehouse_type: str):
"""
import re

import pandas as pd

def validate_date_object(column):
try:
pd.to_datetime(column)
Expand Down Expand Up @@ -342,6 +353,82 @@ def validate_float_object(column):
return OrderedDict(zip(list(dataframe.columns), column_type))


@find_column_type.register(pl.DataFrame)
@find_column_type.register(pl.LazyFrame)
def find_column_type_polars(
dataframe: Union[pl.DataFrame, pl.LazyFrame], warehouse_type: str
):
"""
Find data type of each column from the dataframe.

Following is the list of polars data types that the function checks and their mapping in sql:

- Boolean -> boolean
- Date/Datetime/Duration/Time -> timestamp
- int -> int
- float/decimal -> float
- float object -> float
- datetime object -> timestamp
- others -> varchar

For all other data types, the column will be mapped to varchar type.

Parameters
----------
dataframe : Pandas dataframe

warehouse_type: str
Required to properly determine format of uploaded data, either "snowflake" or "redshift".

Returns
-------
dict
A dictionary of columns with their data type
"""

def validate_date_object(column):
try:
column.str.to_datetime()
return "date"
except Exception:
return None

def validate_float_object(column):
try:
column.cast(pl.UInt32)
return "float"
except Exception:
return None

if warehouse_type.lower() not in ["snowflake", "redshift"]:
raise ValueError(
'warehouse_type argument must be either "snowflake" or "redshift"'
)

column_type = []
for column in dataframe.collect_schema().names():
logger.debug("Checking column: %s", column)
data = dataframe.lazy().select(column).drop_nulls().collect().to_series()
if data.shape[0] == 0:
column_type.append("varchar")
elif data.dtype.is_temporal():
column_type.append("timestamp")
elif str(data.dtype).lower().startswith("bool"):
column_type.append("boolean")
elif data.dtype.is_integer():
column_type.append("int")
elif data.dtype.is_numeric(): # cast all non-integer numeric as float
column_type.append("float")
else:
data_type = validate_float_object(data) or validate_date_object(data)
if not data_type:
column_type.append("varchar")
else:
column_type.append(data_type)
logger.info("Parsing column %s to %s", column, column_type[-1])
return OrderedDict(zip(list(dataframe.columns), column_type))


class ProgressPercentage:
"""ProgressPercentage class is used by the S3Transfer upload_file callback.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [
{ name="Faisal Dosani", email="[email protected]" },
]
license = {text = "Apache Software License"}
dependencies = ["boto3<=1.35.9,>=1.9.92", "PyYAML<=6.0.1,>=5.1", "pandas<=2.2.2,>=0.25.2", "numpy<=2.0.2,>=1.22.0"]
dependencies = ["boto3<=1.35.9,>=1.9.92", "PyYAML<=6.0.1,>=5.1", "pandas<=2.2.2,>=0.25.2", "numpy<=2.0.2,>=1.22.0", "polars>=1.5.0"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1.5 actually the min version that works? I think it's worth looking into this as I think people are still using versions before 1.0.

Copy link
Contributor Author

@gladysteh99 gladysteh99 Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the minimum version needed is 1.0 for one of the methods used (collect_schema) which is necessary for supporting both lazyframe and dataframe. It really draws down to if we want to support both. I personally prefer setting it to at least 1.0.0 because of the significant upgrades from 0.20.0 to 1.0.0.


requires-python = ">=3.9.0"
classifiers = [
Expand Down
Loading