Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass append to execute_data_load #209

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions plaidcloud/utilities/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,9 @@
#
# self._load_csv(table_object, path)

def bulk_insert_dataframe(self, table_object, df, append=False, chunk_size=500000):
"""Pandas-flavored wrapper method to the SQLAlchemy bulk_save_objects
bulk_insert_mappings(mapper, mappings, return_defaults=False, render_nulls=False)
def bulk_insert_dataframe(self, table_object: 'Table', df: pd.DataFrame, append: bool = False, chunk_size: int = 500000):

Check warning on line 428 in plaidcloud/utilities/query.py

View workflow job for this annotation

GitHub Actions / PyLint

[PyLint] plaidcloud/utilities/query.py#L428

R0914: Too many local variables (22/15) (too-many-locals)
Raw output
plaidcloud/utilities/query.py:428:4: R0914: Too many local variables (22/15) (too-many-locals)

Check warning on line 428 in plaidcloud/utilities/query.py

View workflow job for this annotation

GitHub Actions / PyLint

[PyLint] plaidcloud/utilities/query.py#L428

R0912: Too many branches (15/12) (too-many-branches)
Raw output
plaidcloud/utilities/query.py:428:4: R0912: Too many branches (15/12) (too-many-branches)
"""Pandas-flavored wrapper method to the load data into PlaidCloud Table from a Dataframe
bulk_insert_dataframe(table_object, df, append, chunk_size)
"""
if len(df) == 0:
logger.debug('Empty dataframe - nothing to insert')
Expand Down Expand Up @@ -455,7 +455,7 @@

table_meta_out = None
if append:
# order dataframe according to existing structure
# order dataframe according to the existing structure

# create any missing columns
for col in cols_missing:
Expand All @@ -471,7 +471,7 @@
col_order = cols_overwrite

else:
# match order according to existing structure, adding new cols to the end of the table
# match order according to the existing structure, adding new cols to the end of the table
col_order = cols_overwrite

if not table_meta_out:
Expand Down Expand Up @@ -514,6 +514,7 @@
meta=table_meta_out,
load_type=data_load['load_type'],
upload_path=data_load['upload_path'],
append=append,
)
else:
# Do it the old way
Expand Down
10 changes: 9 additions & 1 deletion plaidcloud/utilities/sqlalchemy_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,15 @@ def compile_import_cast_databend(element, compiler, **kw):
elif dtype == 'smallint':
return compiler.process(func.to_int16(col))
elif dtype == 'numeric':
return compiler.process(func.cast(col, sqlalchemy.Numeric(38, 10)))
return compiler.process(
func.cast(
sqlalchemy.case(
(func.to_string(col) == 'NaN', None),
else_=col,
),
sqlalchemy.Numeric(38, 10),
)
)
else:
#if dtype == 'text':
return compiler.process(col, **kw)
Expand Down
3 changes: 2 additions & 1 deletion plaidcloud/utilities/tests/test_sqlalchemy_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@
def test_import_col_numeric(self):
expr = sqlalchemy.func.import_col('Column1', 'numeric', 'YYYY-MM-DD', False)
compiled = expr.compile(dialect=self.eng.dialect, compile_kwargs={"render_postcompile": True})
self.assertEqual(str(compiled), 'CASE WHEN (regexp_replace(%(import_col_1)s, %(regexp_replace_1)s, %(regexp_replace_2)s) = %(regexp_replace_3)s) THEN %(param_1)s ELSE CAST(%(import_col_1)s AS DECIMAL(38, 10)) END')
self.assertEqual(str(compiled), 'CASE WHEN (regexp_replace(%(import_col_1)s, %(regexp_replace_1)s, %(regexp_replace_2)s) = %(regexp_replace_3)s) THEN %(param_1)s ELSE CAST(CASE WHEN (to_string(%(import_col_1)s) = %(to_string_1)s) THEN NULL ELSE %(import_col_1)s END AS DECIMAL(38, 10)) END')

Check warning on line 74 in plaidcloud/utilities/tests/test_sqlalchemy_functions.py

View workflow job for this annotation

GitHub Actions / PyLint

[PyLint] plaidcloud/utilities/tests/test_sqlalchemy_functions.py#L74

C0301: Line too long (299/125) (line-too-long)
Raw output
plaidcloud/utilities/tests/test_sqlalchemy_functions.py:74:0: C0301: Line too long (299/125) (line-too-long)
self.assertEqual('Column1', compiled.params['import_col_1'])
self.assertEqual('\\s*', compiled.params['regexp_replace_1'])
self.assertEqual('', compiled.params['regexp_replace_2'])
self.assertEqual('', compiled.params['regexp_replace_3'])
self.assertEqual(0.0, compiled.params['param_1'])
self.assertEqual('NaN', compiled.params['to_string_1'])

def test_import_col_interval(self):
expr = sqlalchemy.func.import_col('Column1', 'interval', 'YYYY-MM-DD', False)
Expand Down
Loading