Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Polars a first class citizen for pulling and loading data to snowflake #295

Merged
merged 17 commits into from
Sep 5, 2024
Merged
23 changes: 15 additions & 8 deletions locopy/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import time

import pandas
import polars

from locopy.errors import CredentialsError, DBError
from locopy.logger import INFO, get_logger
from locopy.utility import read_config_yaml
Expand Down Expand Up @@ -188,25 +191,25 @@ def column_names(self):
except Exception:
return [column[0].lower() for column in self.cursor.description]

def to_dataframe(self, size=None):
def to_dataframe(self, df_type="pandas", size=None):
"""Return a dataframe of the last query results.

This imports Pandas in here, so that it's not needed for other use cases. This is just a
convenience method.

Parameters
----------
df_type: Literal["pandas","polars"], optional
Output dataframe format. Defaults to pandas.

size : int, optional
Chunk size to fetch. Defaults to None.

Returns
-------
pandas.DataFrame
pandas.DataFrame or polars.DataFrame
Dataframe with lowercase column names. Returns None if no fetched
result.
"""
import pandas

if df_type not in ["pandas", "polars"]:
raise ValueError("df_type must be ``pandas`` or ``polars``.")
columns = self.column_names()

if size is None:
Expand All @@ -220,7 +223,11 @@ def to_dataframe(self, size=None):

if len(fetched) == 0:
return None
return pandas.DataFrame(fetched, columns=columns)

if df_type == "pandas":
gladysteh99 marked this conversation as resolved.
Show resolved Hide resolved
return pandas.DataFrame(fetched, columns=columns)
elif df_type == "polars":
return polars.DataFrame(fetched, schema=columns, orient="row")

def to_dict(self):
"""Generate dictionaries of rows.
Expand Down
65 changes: 54 additions & 11 deletions locopy/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
"""

import os
from functools import singledispatch
from pathlib import Path

import pandas as pd
import polars as pl
import polars.selectors as cs

from locopy.database import Database
from locopy.errors import DBError, S3CredentialsError
from locopy.logger import INFO, get_logger
Expand Down Expand Up @@ -537,14 +542,14 @@ def insert_dataframe_to_table(
verbose=False,
):
"""
Insert a Pandas dataframe to an existing table or a new table.
Insert a Pandas or Polars dataframe to an existing table or a new table.

`executemany` in psycopg2 and pg8000 has very poor performance in terms of running speed.
To overcome this issue, we instead format the insert query and then run `execute`.

Parameters
----------
dataframe: Pandas Dataframe
dataframe: pandas.DataFrame or polars.DataFrame
The pandas dataframe which needs to be inserted.

table_name: str
Expand All @@ -567,8 +572,6 @@ def insert_dataframe_to_table(


"""
import pandas as pd

if columns:
dataframe = dataframe[columns]

Expand Down Expand Up @@ -599,9 +602,15 @@ def insert_dataframe_to_table(
self.execute(create_query)
logger.info("New table has been created")

logger.info("Inserting records...")
for start in range(0, len(dataframe), batch_size):
# create a list of tuples for insert
# create a list of tuples for insert
@singledispatch
def get_insert_tuple(dataframe, start, batch_size):
"""Create a list of tuples for insert."""
pass

@get_insert_tuple.register(pd.DataFrame)
def get_insert_tuple_pandas(dataframe: pd.DataFrame, start, batch_size):
"""Create a list of tuples for insert when dataframe is pd.DataFrame."""
to_insert = []
for row in dataframe[start : (start + batch_size)].itertuples(index=False):
none_row = (
Expand All @@ -617,9 +626,43 @@ def insert_dataframe_to_table(
+ ")"
)
to_insert.append(none_row)
string_join = ", ".join(to_insert)
insert_query = (
f"""INSERT INTO {table_name} {column_sql} VALUES {string_join}"""
return to_insert

@get_insert_tuple.register(pl.DataFrame)
def get_insert_tuple_polars(dataframe: pl.DataFrame, start, batch_size):
"""Create a list of tuples for insert when dataframe is pl.DataFrame."""
to_insert = []
dataframe = dataframe.with_columns(
dataframe.select(cs.numeric().fill_nan(None))
)
self.execute(insert_query, verbose=verbose)
for row in dataframe[start : (start + batch_size)].iter_rows():
none_row = (
"("
+ ", ".join(
[
"NULL"
if val is None
else "'" + str(val).replace("'", "''") + "'"
for val in row
]
)
+ ")"
)
to_insert.append(none_row)
return to_insert

logger.info("Inserting records...")
try:
for start in range(0, len(dataframe), batch_size):
to_insert = get_insert_tuple(dataframe, start, batch_size)
string_join = ", ".join(to_insert)
insert_query = (
f"""INSERT INTO {table_name} {column_sql} VALUES {string_join}"""
)
self.execute(insert_query, verbose=verbose)
except TypeError:
raise TypeError(
"DataFrame to insert must either be a pandas.DataFrame or polars.DataFrame."
) from None

logger.info("Table insertion has completed")
67 changes: 54 additions & 13 deletions locopy/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
"""

import os
from functools import singledispatch
from pathlib import PurePath

import pandas as pd
import polars as pl
import polars.selectors as cs

from locopy.database import Database
from locopy.errors import DBError, S3CredentialsError
from locopy.logger import INFO, get_logger
Expand Down Expand Up @@ -396,7 +401,7 @@ def unload(
def insert_dataframe_to_table(
self, dataframe, table_name, columns=None, create=False, metadata=None
):
"""Insert a Pandas dataframe to an existing table or a new table.
"""Insert a Pandas or Polars dataframe to an existing table or a new table.

In newer versions of the
python snowflake connector (v2.1.2+) users can call the ``write_pandas`` method from the cursor
Expand All @@ -408,8 +413,8 @@ def insert_dataframe_to_table(

Parameters
----------
dataframe: Pandas Dataframe
The pandas dataframe which needs to be inserted.
dataframe: Pandas or Polars Dataframe
The pandas or polars dataframe which needs to be inserted.

table_name: str
The name of the Snowflake table which is being inserted.
Expand All @@ -423,8 +428,6 @@ def insert_dataframe_to_table(
metadata: dictionary, optional
If metadata==None, it will be generated based on data
"""
import pandas as pd

if columns:
dataframe = dataframe[columns]

Expand All @@ -433,10 +436,39 @@ def insert_dataframe_to_table(
string_join = "(" + ",".join(["%s"] * len(all_columns)) + ")"

# create a list of tuples for insert
to_insert = []
for row in dataframe.itertuples(index=False):
none_row = tuple(None if pd.isnull(val) else str(val) for val in row)
to_insert.append(none_row)
@singledispatch
def get_insert_tuple(dataframe):
"""Create a list of tuples for insert."""
pass

@get_insert_tuple.register(pd.DataFrame)
def get_insert_tuple_pandas(dataframe: pd.DataFrame):
"""Create a list of tuples for insert when dataframe is pd.DataFrame."""
to_insert = []
for row in dataframe.itertuples(index=False):
none_row = tuple(None if pd.isnull(val) else str(val) for val in row)
to_insert.append(none_row)
return to_insert

@get_insert_tuple.register(pl.DataFrame)
def get_insert_tuple_polars(dataframe: pl.DataFrame):
"""Create a list of tuples for insert when dataframe is pl.DataFrame."""
to_insert = []
dataframe = dataframe.with_columns(
dataframe.select(cs.numeric().fill_nan(None))
)
for row in dataframe.iter_rows():
none_row = tuple(None if val is None else str(val) for val in row)
to_insert.append(none_row)
return to_insert

# create a list of tuples for insert
try:
to_insert = get_insert_tuple(dataframe)
except TypeError:
raise TypeError(
"DataFrame to insert must either be a pandas.DataFrame or polars.DataFrame."
) from None

if not create and metadata:
logger.warning("Metadata will not be used because create is set to False.")
Expand Down Expand Up @@ -468,7 +500,7 @@ def insert_dataframe_to_table(
self.execute(insert_query, params=to_insert, many=True)
logger.info("Table insertion has completed")

def to_dataframe(self, size=None):
def to_dataframe(self, df_type="pandas", size=None):
"""Return a dataframe of the last query results.

This is just a convenience method. This
Expand All @@ -479,16 +511,25 @@ def to_dataframe(self, size=None):

Parameters
----------
df_type: Literal["pandas","polars"], optional
Output dataframe format. Defaults to pandas.

size : int, optional
Chunk size to fetch. Defaults to None.

Returns
-------
pandas.DataFrame
pandas.DataFrame or polars.DataFrame
Dataframe with lowercase column names. Returns None if no fetched
result.
"""
if df_type not in ["pandas", "polars"]:
raise ValueError("df_type must be ``pandas`` or ``polars``.")

if size is None and self.cursor._query_result_format == "arrow":
return self.cursor.fetch_pandas_all()
if df_type == "pandas":
return self.cursor.fetch_pandas_all()
elif df_type == "polars":
return pl.from_arrow(self.cursor.fetch_arrow_all())
else:
return super().to_dataframe(size)
return super().to_dataframe(df_type=df_type, size=size)
Loading