From 6cd878e49d6a0665ad19af898f1f8400d3408039 Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Tue, 9 Jan 2024 12:07:04 -0600 Subject: [PATCH 1/7] Add drop columns to Iceberg schema evolution --- awswrangler/athena/_write_iceberg.py | 25 +++++++++++++++---- tests/unit/test_athena.py | 37 +++++++++++++++++++++++----- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 9a3b08d35..a9017d6f3 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -82,7 +82,7 @@ def _create_iceberg_table( class _SchemaChanges(TypedDict): to_add: dict[str, str] to_change: dict[str, str] - to_remove: set[str] + to_drop: set[str] def _determine_differences( @@ -109,7 +109,7 @@ def _determine_differences( new_columns = set(frame_columns_types) to_add = {col: frame_columns_types[col] for col in new_columns - original_columns} - to_remove = original_columns - new_columns + to_drop = original_columns - new_columns columns_to_change = [ col @@ -118,7 +118,7 @@ def _determine_differences( ] to_change = {col: frame_columns_types[col] for col in columns_to_change} - return _SchemaChanges(to_add=to_add, to_change=to_change, to_remove=to_remove) + return _SchemaChanges(to_add=to_add, to_change=to_change, to_drop=to_drop) def _alter_iceberg_table( @@ -146,8 +146,11 @@ def _alter_iceberg_table( columns_to_change=schema_changes["to_change"], ) - if schema_changes["to_remove"]: - raise exceptions.InvalidArgumentCombination("Removing columns of Iceberg tables is not currently supported.") + if schema_changes["to_drop"]: + sql_statements += _alter_iceberg_table_drop_columns_sql( + table=table, + columns_to_drop=schema_changes["to_drop"], + ) for statement in sql_statements: query_execution_id: str = _start_query_execution( @@ -184,6 +187,18 @@ def _alter_iceberg_table_change_columns_sql( return sql_statements +def _alter_iceberg_table_drop_columns_sql( + table: str, + columns_to_drop: set[str], +) -> list[str]: + sql_statements = [] + + for col_name in columns_to_drop: + sql_statements.append(f"ALTER TABLE {table} DROP COLUMN {col_name}") + + return sql_statements + + @apply_configs @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "s3_additional_kwargs"], diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index e92aa84c9..0449b715f 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -1628,10 +1628,11 @@ def test_athena_to_iceberg_schema_evolution_modify_columns( assert str(df2_out["c2"].dtype).startswith("Int64") -def test_athena_to_iceberg_schema_evolution_remove_columns_error( +def test_athena_to_iceberg_schema_evolution_remove_columns( path: str, path2: str, glue_database: str, glue_table: str ) -> None: - df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) + # Version 1 + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]}) wr.athena.to_iceberg( df=df, database=glue_database, @@ -1642,17 +1643,41 @@ def test_athena_to_iceberg_schema_evolution_remove_columns_error( schema_evolution=True, ) - df = pd.DataFrame({"c0": [6, 7, 8]}) + column_types = wr.catalog.get_table_types(glue_database, glue_table) + assert len(column_types) == len(df.columns) - with pytest.raises(wr.exceptions.InvalidArgumentCombination): + # Version 2 + df2 = pd.DataFrame({"c0": [10, 11, 12], "c1": [13, 14, 15]}) + wr.athena.to_iceberg( + df=df2, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + ) + + df2_out = wr.athena.read_sql_table( + table=glue_table, + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + assert len(df2_out) == len(df) * 2 + assert np.all(df2_out.columns == df2.columns) + + # Version 3 + df3 = pd.DataFrame({"c0": [20, 21, 22]}) + with pytest.raises(wr.exceptions.InvalidArgumentValue): wr.athena.to_iceberg( - df=df, + df=df3, database=glue_database, table=glue_table, table_location=path, temp_path=path2, keep_files=False, - schema_evolution=True, + schema_evolution=False, ) From 8b5e18fa0b5d4a0722ef27ee7c58d89ab0d0507c Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Tue, 16 Jan 2024 12:19:00 -0600 Subject: [PATCH 2/7] undo change --- awswrangler/athena/_write_iceberg.py | 17 ++----------- tests/unit/test_athena.py | 37 +++++----------------------- 2 files changed, 8 insertions(+), 46 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index a9017d6f3..b4fab1ae7 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -147,9 +147,8 @@ def _alter_iceberg_table( ) if schema_changes["to_drop"]: - sql_statements += _alter_iceberg_table_drop_columns_sql( - table=table, - columns_to_drop=schema_changes["to_drop"], + raise exceptions.InvalidArgumentCombination( + f"Dropping columns of Iceberg tables is not currently supported: {schema_changes['to_drop']}" ) for statement in sql_statements: @@ -187,18 +186,6 @@ def _alter_iceberg_table_change_columns_sql( return sql_statements -def _alter_iceberg_table_drop_columns_sql( - table: str, - columns_to_drop: set[str], -) -> list[str]: - sql_statements = [] - - for col_name in columns_to_drop: - sql_statements.append(f"ALTER TABLE {table} DROP COLUMN {col_name}") - - return sql_statements - - @apply_configs @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "s3_additional_kwargs"], diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index 0449b715f..c660fc196 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -1628,11 +1628,10 @@ def test_athena_to_iceberg_schema_evolution_modify_columns( assert str(df2_out["c2"].dtype).startswith("Int64") -def test_athena_to_iceberg_schema_evolution_remove_columns( +def test_athena_to_iceberg_schema_evolution_drop_columns_error( path: str, path2: str, glue_database: str, glue_table: str ) -> None: - # Version 1 - df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]}) + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) wr.athena.to_iceberg( df=df, database=glue_database, @@ -1643,41 +1642,17 @@ def test_athena_to_iceberg_schema_evolution_remove_columns( schema_evolution=True, ) - column_types = wr.catalog.get_table_types(glue_database, glue_table) - assert len(column_types) == len(df.columns) - - # Version 2 - df2 = pd.DataFrame({"c0": [10, 11, 12], "c1": [13, 14, 15]}) - wr.athena.to_iceberg( - df=df2, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - ) - - df2_out = wr.athena.read_sql_table( - table=glue_table, - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - assert len(df2_out) == len(df) * 2 - assert np.all(df2_out.columns == df2.columns) + df = pd.DataFrame({"c0": [6, 7, 8]}) - # Version 3 - df3 = pd.DataFrame({"c0": [20, 21, 22]}) - with pytest.raises(wr.exceptions.InvalidArgumentValue): + with pytest.raises(wr.exceptions.InvalidArgumentCombination): wr.athena.to_iceberg( - df=df3, + df=df, database=glue_database, table=glue_table, table_location=path, temp_path=path2, keep_files=False, - schema_evolution=False, + schema_evolution=True, ) From 74b75d5f8335adf0027764d432ff4a98466443fd Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Tue, 16 Jan 2024 13:35:39 -0600 Subject: [PATCH 3/7] add schema_fill_missing param --- awswrangler/athena/_write_iceberg.py | 15 ++++++++-- tests/unit/test_athena.py | 43 ++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index b4fab1ae7..89c6dde05 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -82,7 +82,7 @@ def _create_iceberg_table( class _SchemaChanges(TypedDict): to_add: dict[str, str] to_change: dict[str, str] - to_drop: set[str] + to_drop: dict[str, str] def _determine_differences( @@ -109,7 +109,7 @@ def _determine_differences( new_columns = set(frame_columns_types) to_add = {col: frame_columns_types[col] for col in new_columns - original_columns} - to_drop = original_columns - new_columns + to_drop = {col: catalog_column_types[col] for col in original_columns - new_columns} columns_to_change = [ col @@ -125,6 +125,7 @@ def _alter_iceberg_table( database: str, table: str, schema_changes: _SchemaChanges, + schema_fill_missing: bool, wg_config: _WorkGroupConfig, data_source: str | None = None, workgroup: str | None = None, @@ -146,7 +147,7 @@ def _alter_iceberg_table( columns_to_change=schema_changes["to_change"], ) - if schema_changes["to_drop"]: + if schema_changes["to_drop"] and not schema_fill_missing: raise exceptions.InvalidArgumentCombination( f"Dropping columns of Iceberg tables is not currently supported: {schema_changes['to_drop']}" ) @@ -210,6 +211,7 @@ def to_iceberg( dtype: dict[str, str] | None = None, catalog_id: str | None = None, schema_evolution: bool = False, + schema_fill_missing: bool = False, glue_table_settings: GlueTableSettings | None = None, ) -> None: """ @@ -362,6 +364,7 @@ def to_iceberg( database=database, table=table, schema_changes=schema_differences, + schema_fill_missing=schema_fill_missing, wg_config=wg_config, data_source=data_source, workgroup=workgroup, @@ -370,6 +373,12 @@ def to_iceberg( boto3_session=boto3_session, ) + # Add missing columns to the DataFrame + if schema_differences["to_drop"] and schema_fill_missing: + for col_name, col_type in schema_differences["to_drop"].items(): + df[col_name] = None + df[col_name] = df[col_name].astype(col_type) + # Create temporary external table, write the results s3.to_parquet( df=df, diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index c660fc196..d374d960c 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -1628,6 +1628,49 @@ def test_athena_to_iceberg_schema_evolution_modify_columns( assert str(df2_out["c2"].dtype).startswith("Int64") +def test_athena_to_iceberg_schema_evolution_fill_missing_columns( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"c0": [0, 1, 2], "c1": ["foo", "bar", "baz"]}) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + ) + + print(wr.catalog.table(glue_database, glue_table)) + + df = pd.DataFrame({"c0": [3, 4, 5]}) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + schema_fill_missing=True, + ) + print(wr.catalog.table(glue_database, glue_table)) + + df_actual = wr.athena.read_sql_table( + table=glue_table, + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + df_actual = df_actual.sort_values("c0").reset_index(drop=True) + df_actual["c0"] = df_actual["c0"].astype("int64") + + df_expected = pd.DataFrame({"c0": [0, 1, 2, 3, 4, 5], "c1": ["foo", "bar", "baz", np.nan, np.nan, np.nan]}) + df_expected["c1"] = df_expected["c1"].astype("string") + + assert_pandas_equals(df_actual, df_expected) + + def test_athena_to_iceberg_schema_evolution_drop_columns_error( path: str, path2: str, glue_database: str, glue_table: str ) -> None: From 1e6d08dfd109674d3067cdeb3d0c0224aa39e3c1 Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Tue, 16 Jan 2024 13:46:49 -0600 Subject: [PATCH 4/7] rename typed dict --- awswrangler/athena/_write_iceberg.py | 39 ++++++++++++++++------------ 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 89c6dde05..73210d2c8 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -80,9 +80,9 @@ def _create_iceberg_table( class _SchemaChanges(TypedDict): - to_add: dict[str, str] - to_change: dict[str, str] - to_drop: dict[str, str] + new_columns: dict[str, str] + modified_columns: dict[str, str] + missing_columns: dict[str, str] def _determine_differences( @@ -105,20 +105,20 @@ def _determine_differences( catalog.get_table_types(database=database, table=table, catalog_id=catalog_id, boto3_session=boto3_session), ) - original_columns = set(catalog_column_types) - new_columns = set(frame_columns_types) + original_column_names = set(catalog_column_types) + new_column_names = set(frame_columns_types) - to_add = {col: frame_columns_types[col] for col in new_columns - original_columns} - to_drop = {col: catalog_column_types[col] for col in original_columns - new_columns} + new_columns = {col: frame_columns_types[col] for col in new_column_names - original_column_names} + missing_columns = {col: catalog_column_types[col] for col in original_column_names - new_column_names} columns_to_change = [ col - for col in original_columns.intersection(new_columns) + for col in original_column_names.intersection(new_column_names) if frame_columns_types[col] != catalog_column_types[col] ] to_change = {col: frame_columns_types[col] for col in columns_to_change} - return _SchemaChanges(to_add=to_add, to_change=to_change, to_drop=to_drop) + return _SchemaChanges(new_columns=new_columns, modified_columns=to_change, missing_columns=missing_columns) def _alter_iceberg_table( @@ -135,21 +135,22 @@ def _alter_iceberg_table( ) -> None: sql_statements: list[str] = [] - if schema_changes["to_add"]: + if schema_changes["new_columns"]: sql_statements += _alter_iceberg_table_add_columns_sql( table=table, - columns_to_add=schema_changes["to_add"], + columns_to_add=schema_changes["new_columns"], ) - if schema_changes["to_change"]: + if schema_changes["modified_columns"]: sql_statements += _alter_iceberg_table_change_columns_sql( table=table, - columns_to_change=schema_changes["to_change"], + columns_to_change=schema_changes["modified_columns"], ) - if schema_changes["to_drop"] and not schema_fill_missing: + if schema_changes["missing_columns"] and not schema_fill_missing: raise exceptions.InvalidArgumentCombination( - f"Dropping columns of Iceberg tables is not currently supported: {schema_changes['to_drop']}" + f"Dropping columns of Iceberg tables is not supported: {schema_changes['missing_columns']}. " + "Please use `schema_fill_missing=True` to fill missing columns with N/A." ) for statement in sql_statements: @@ -273,6 +274,10 @@ def to_iceberg( If none is provided, the AWS account ID is used by default schema_evolution: bool If True allows schema evolution for new columns or changes in column types. + Missing columns will throw an error unless ``schema_fill_missing`` is set to ``True``. + schema_fill_missing: bool + If True, fill missing columns with NULL values. + Only takes effect if ``schema_evolution`` is set to True. columns_comments: GlueTableSettings, optional Glue/Athena catalog: Settings for writing to the Glue table. Currently only the 'columns_comments' attribute is supported for this function. @@ -374,8 +379,8 @@ def to_iceberg( ) # Add missing columns to the DataFrame - if schema_differences["to_drop"] and schema_fill_missing: - for col_name, col_type in schema_differences["to_drop"].items(): + if schema_differences["missing_columns"] and schema_fill_missing: + for col_name, col_type in schema_differences["missing_columns"].items(): df[col_name] = None df[col_name] = df[col_name].astype(col_type) From 4dab1b485d8339d799a15d3d368f64d0e80d8623 Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Tue, 16 Jan 2024 13:48:28 -0600 Subject: [PATCH 5/7] fix type --- awswrangler/athena/_write_iceberg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 73210d2c8..2eace14bf 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -27,7 +27,7 @@ def _create_iceberg_table( df: pd.DataFrame, database: str, table: str, - path: str, + path: str | None, wg_config: _WorkGroupConfig, partition_cols: list[str] | None, additional_table_properties: dict[str, Any] | None, @@ -338,7 +338,7 @@ def to_iceberg( df=df, database=database, table=table, - path=table_location, # type: ignore[arg-type] + path=table_location, wg_config=wg_config, partition_cols=partition_cols, additional_table_properties=additional_table_properties, From dbbe4802b0a7ec620f4db19b47b0d54e56247e52 Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Tue, 16 Jan 2024 13:52:06 -0600 Subject: [PATCH 6/7] fix inconsistent var names --- awswrangler/athena/_write_iceberg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 2eace14bf..21c2c4063 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -116,9 +116,9 @@ def _determine_differences( for col in original_column_names.intersection(new_column_names) if frame_columns_types[col] != catalog_column_types[col] ] - to_change = {col: frame_columns_types[col] for col in columns_to_change} + modified_columns = {col: frame_columns_types[col] for col in columns_to_change} - return _SchemaChanges(new_columns=new_columns, modified_columns=to_change, missing_columns=missing_columns) + return _SchemaChanges(new_columns=new_columns, modified_columns=modified_columns, missing_columns=missing_columns) def _alter_iceberg_table( From ddf312d02aab656b2ec60a039f24586c2a9186e8 Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Thu, 18 Jan 2024 11:17:51 -0600 Subject: [PATCH 7/7] rename param and fix df ordering --- awswrangler/athena/_write_iceberg.py | 52 ++-- tests/unit/test_athena.py | 359 ------------------------ tests/unit/test_athena_iceberg.py | 393 +++++++++++++++++++++++++++ 3 files changed, 425 insertions(+), 379 deletions(-) create mode 100644 tests/unit/test_athena_iceberg.py diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 21c2c4063..7aff96aa7 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -94,7 +94,7 @@ def _determine_differences( boto3_session: boto3.Session | None, dtype: dict[str, str] | None, catalog_id: str | None, -) -> _SchemaChanges: +) -> tuple[_SchemaChanges, list[str]]: frame_columns_types, frame_partitions_types = _data_types.athena_types_from_pandas_partitioned( df=df, index=index, partition_cols=partition_cols, dtype=dtype ) @@ -118,14 +118,17 @@ def _determine_differences( ] modified_columns = {col: frame_columns_types[col] for col in columns_to_change} - return _SchemaChanges(new_columns=new_columns, modified_columns=modified_columns, missing_columns=missing_columns) + return ( + _SchemaChanges(new_columns=new_columns, modified_columns=modified_columns, missing_columns=missing_columns), + [key for key in catalog_column_types], + ) def _alter_iceberg_table( database: str, table: str, schema_changes: _SchemaChanges, - schema_fill_missing: bool, + fill_missing_columns_in_df: bool, wg_config: _WorkGroupConfig, data_source: str | None = None, workgroup: str | None = None, @@ -147,10 +150,10 @@ def _alter_iceberg_table( columns_to_change=schema_changes["modified_columns"], ) - if schema_changes["missing_columns"] and not schema_fill_missing: + if schema_changes["missing_columns"] and not fill_missing_columns_in_df: raise exceptions.InvalidArgumentCombination( f"Dropping columns of Iceberg tables is not supported: {schema_changes['missing_columns']}. " - "Please use `schema_fill_missing=True` to fill missing columns with N/A." + "Please use `fill_missing_columns_in_df=True` to fill missing columns with N/A." ) for statement in sql_statements: @@ -212,7 +215,7 @@ def to_iceberg( dtype: dict[str, str] | None = None, catalog_id: str | None = None, schema_evolution: bool = False, - schema_fill_missing: bool = False, + fill_missing_columns_in_df: bool = True, glue_table_settings: GlueTableSettings | None = None, ) -> None: """ @@ -272,12 +275,14 @@ def to_iceberg( catalog_id : str, optional The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default - schema_evolution: bool - If True allows schema evolution for new columns or changes in column types. - Missing columns will throw an error unless ``schema_fill_missing`` is set to ``True``. - schema_fill_missing: bool - If True, fill missing columns with NULL values. - Only takes effect if ``schema_evolution`` is set to True. + schema_evolution: bool, optional + If ``True`` allows schema evolution for new columns or changes in column types. + Columns missing from the DataFrame that are present in the Iceberg schema + will throw an error unless ``fill_missing_columns_in_df`` is set to ``True``. + Default is ``False``. + fill_missing_columns_in_df: bool, optional + If ``True``, fill columns that was missing in the DataFrame with ``NULL`` values. + Default is ``True``. columns_comments: GlueTableSettings, optional Glue/Athena catalog: Settings for writing to the Glue table. Currently only the 'columns_comments' attribute is supported for this function. @@ -352,7 +357,7 @@ def to_iceberg( columns_comments=glue_table_settings.get("columns_comments"), ) else: - schema_differences = _determine_differences( + schema_differences, catalog_cols = _determine_differences( df=df, database=database, table=table, @@ -362,6 +367,19 @@ def to_iceberg( dtype=dtype, catalog_id=catalog_id, ) + + # Add missing columns to the DataFrame + if fill_missing_columns_in_df and schema_differences["missing_columns"]: + for col_name, col_type in schema_differences["missing_columns"].items(): + df[col_name] = None + df[col_name] = df[col_name].astype(_data_types.athena2pandas(col_type)) + + schema_differences["missing_columns"] = {} + + # Ensure that the ordering of the DF is the same as in the catalog. + # This is required for the INSERT command to work. + df = df[catalog_cols] + if schema_evolution is False and any([schema_differences[x] for x in schema_differences]): # type: ignore[literal-required] raise exceptions.InvalidArgumentValue(f"Schema change detected: {schema_differences}") @@ -369,7 +387,7 @@ def to_iceberg( database=database, table=table, schema_changes=schema_differences, - schema_fill_missing=schema_fill_missing, + fill_missing_columns_in_df=fill_missing_columns_in_df, wg_config=wg_config, data_source=data_source, workgroup=workgroup, @@ -378,12 +396,6 @@ def to_iceberg( boto3_session=boto3_session, ) - # Add missing columns to the DataFrame - if schema_differences["missing_columns"] and schema_fill_missing: - for col_name, col_type in schema_differences["missing_columns"].items(): - df[col_name] = None - df[col_name] = df[col_name].astype(col_type) - # Create temporary external table, write the results s3.to_parquet( df=df, diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index d374d960c..e2d2b8352 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -25,7 +25,6 @@ get_df_txt, get_time_str_with_random_suffix, pandas_equals, - ts, ) logging.getLogger("awswrangler").setLevel(logging.DEBUG) @@ -1487,361 +1486,3 @@ def test_athena_date_recovery(path, glue_database, glue_table): ctas_approach=False, ) assert pandas_equals(df, df2) - - -@pytest.mark.parametrize("partition_cols", [None, ["name"], ["name", "day(ts)"]]) -@pytest.mark.parametrize( - "additional_table_properties", - [None, {"write_target_data_file_size_bytes": 536870912, "optimize_rewrite_delete_file_threshold": 10}], -) -def test_athena_to_iceberg(path, path2, glue_database, glue_table, partition_cols, additional_table_properties): - df = pd.DataFrame( - { - "id": [1, 2, 3], - "name": ["a", "b", "c"], - "ts": [ts("2020-01-01 00:00:00.0"), ts("2020-01-02 00:00:01.0"), ts("2020-01-03 00:00:00.0")], - } - ) - df["id"] = df["id"].astype("Int64") # Cast as nullable int64 type - df["name"] = df["name"].astype("string") - - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - partition_cols=partition_cols, - additional_table_properties=additional_table_properties, - ) - - df_out = wr.athena.read_sql_query( - sql=f'SELECT * FROM "{glue_table}" ORDER BY id', - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - - assert df.equals(df_out) - - -def test_athena_to_iceberg_schema_evolution_add_columns( - path: str, path2: str, glue_database: str, glue_table: str -) -> None: - df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - ) - - df["c2"] = [6, 7, 8] - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - ) - - column_types = wr.catalog.get_table_types(glue_database, glue_table) - assert len(column_types) == len(df.columns) - - df_out = wr.athena.read_sql_table( - table=glue_table, - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - assert len(df_out) == len(df) * 2 - - df["c3"] = [9, 10, 11] - with pytest.raises(wr.exceptions.InvalidArgumentValue): - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=False, - ) - - -def test_athena_to_iceberg_schema_evolution_modify_columns( - path: str, path2: str, glue_database: str, glue_table: str -) -> None: - # Version 1 - df = pd.DataFrame({"c1": pd.Series([1.0, 2.0], dtype="float32"), "c2": pd.Series([-1, -2], dtype="int32")}) - - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - ) - - df_out = wr.athena.read_sql_table( - table=glue_table, - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - - assert len(df_out) == 2 - assert len(df_out.columns) == 2 - assert str(df_out["c1"].dtype).startswith("float32") - assert str(df_out["c2"].dtype).startswith("Int32") - - # Version 2 - df2 = pd.DataFrame({"c1": pd.Series([3.0, 4.0], dtype="float64"), "c2": pd.Series([-3, -4], dtype="int64")}) - - wr.athena.to_iceberg( - df=df2, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - ) - - df2_out = wr.athena.read_sql_table( - table=glue_table, - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - - assert len(df2_out) == 4 - assert len(df2_out.columns) == 2 - assert str(df2_out["c1"].dtype).startswith("float64") - assert str(df2_out["c2"].dtype).startswith("Int64") - - -def test_athena_to_iceberg_schema_evolution_fill_missing_columns( - path: str, path2: str, glue_database: str, glue_table: str -) -> None: - df = pd.DataFrame({"c0": [0, 1, 2], "c1": ["foo", "bar", "baz"]}) - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - ) - - print(wr.catalog.table(glue_database, glue_table)) - - df = pd.DataFrame({"c0": [3, 4, 5]}) - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - schema_fill_missing=True, - ) - print(wr.catalog.table(glue_database, glue_table)) - - df_actual = wr.athena.read_sql_table( - table=glue_table, - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - df_actual = df_actual.sort_values("c0").reset_index(drop=True) - df_actual["c0"] = df_actual["c0"].astype("int64") - - df_expected = pd.DataFrame({"c0": [0, 1, 2, 3, 4, 5], "c1": ["foo", "bar", "baz", np.nan, np.nan, np.nan]}) - df_expected["c1"] = df_expected["c1"].astype("string") - - assert_pandas_equals(df_actual, df_expected) - - -def test_athena_to_iceberg_schema_evolution_drop_columns_error( - path: str, path2: str, glue_database: str, glue_table: str -) -> None: - df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - ) - - df = pd.DataFrame({"c0": [6, 7, 8]}) - - with pytest.raises(wr.exceptions.InvalidArgumentCombination): - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - schema_evolution=True, - ) - - -def test_to_iceberg_cast(path, path2, glue_table, glue_database): - df = pd.DataFrame( - { - "c0": [ - datetime.date(4000, 1, 1), - datetime.datetime(2000, 1, 1, 10), - "2020", - "2020-01", - 1, - None, - pd.NA, - pd.NaT, - np.nan, - np.inf, - ] - } - ) - df_expected = pd.DataFrame( - { - "c0": [ - datetime.date(1970, 1, 1), - datetime.date(2000, 1, 1), - datetime.date(2020, 1, 1), - datetime.date(2020, 1, 1), - datetime.date(4000, 1, 1), - None, - None, - None, - None, - None, - ] - } - ) - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - dtype={"c0": "date"}, - ) - df2 = wr.athena.read_sql_table(database=glue_database, table=glue_table, ctas_approach=False) - assert pandas_equals(df_expected, df2.sort_values("c0").reset_index(drop=True)) - - -def test_athena_to_iceberg_with_hyphenated_table_name( - path: str, path2: str, glue_database: str, glue_table_with_hyphenated_name: str -): - df = pd.DataFrame({"c0": [1, 2, 3, 4], "c1": ["foo", "bar", "baz", "boo"]}) - df["c0"] = df["c0"].astype("int") - df["c1"] = df["c1"].astype("string") - - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table_with_hyphenated_name, - table_location=path, - temp_path=path2, - keep_files=False, - ) - - df_out = wr.athena.read_sql_query( - sql=f'SELECT * FROM "{glue_table_with_hyphenated_name}"', - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - - assert len(df) == len(df_out) - assert len(df.columns) == len(df_out.columns) - - -def test_athena_to_iceberg_column_comments(path: str, path2: str, glue_database: str, glue_table: str) -> None: - df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) - column_comments = { - "c0": "comment 0", - "c1": "comment 1", - } - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - glue_table_settings={ - "columns_comments": column_comments, - }, - ) - - column_comments_actual = wr.catalog.get_columns_comments(glue_database, glue_table) - - assert column_comments_actual == column_comments - - -def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str, glue_table: str) -> None: - df = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["1984", "1996"], "gross": [35_000_000, 60_000_000]}) - df["title"] = df["title"].astype("string") - df["year"] = df["year"].astype("string") - df["gross"] = df["gross"].astype("Int64") - - wr.athena.to_iceberg( - df=df, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - ) - - # Perform MERGE INTO - df2 = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["2021", "1996"], "gross": [400_000_000, 60_000_001]}) - df2["title"] = df2["title"].astype("string") - df2["year"] = df2["year"].astype("string") - df2["gross"] = df2["gross"].astype("Int64") - - wr.athena.to_iceberg( - df=df2, - database=glue_database, - table=glue_table, - table_location=path, - temp_path=path2, - keep_files=False, - merge_cols=["title", "year"], - ) - - # Expected output - df_expected = pd.DataFrame( - { - "title": ["Dune", "Fargo", "Dune"], - "year": ["1984", "1996", "2021"], - "gross": [35_000_000, 60_000_001, 400_000_000], - } - ) - df_expected["title"] = df_expected["title"].astype("string") - df_expected["year"] = df_expected["year"].astype("string") - df_expected["gross"] = df_expected["gross"].astype("Int64") - - df_out = wr.athena.read_sql_query( - sql=f'SELECT * FROM "{glue_table}" ORDER BY year', - database=glue_database, - ctas_approach=False, - unload_approach=False, - ) - - assert_pandas_equals(df_expected, df_out) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py new file mode 100644 index 000000000..c243822d7 --- /dev/null +++ b/tests/unit/test_athena_iceberg.py @@ -0,0 +1,393 @@ +from __future__ import annotations + +import datetime +import logging +from typing import Any + +import numpy as np +import pytest + +import awswrangler as wr +import awswrangler.pandas as pd + +from .._utils import ( + assert_pandas_equals, + pandas_equals, + ts, +) + +logging.getLogger("awswrangler").setLevel(logging.DEBUG) + +pytestmark = pytest.mark.distributed + + +@pytest.mark.parametrize("partition_cols", [None, ["name"], ["name", "day(ts)"]]) +@pytest.mark.parametrize( + "additional_table_properties", + [None, {"write_target_data_file_size_bytes": 536870912, "optimize_rewrite_delete_file_threshold": 10}], +) +def test_athena_to_iceberg( + path: str, + path2: str, + glue_database: str, + glue_table: str, + partition_cols: list[str] | None, + additional_table_properties: dict[str, Any] | None, +) -> None: + df = pd.DataFrame( + { + "id": [1, 2, 3], + "name": ["a", "b", "c"], + "ts": [ts("2020-01-01 00:00:00.0"), ts("2020-01-02 00:00:01.0"), ts("2020-01-03 00:00:00.0")], + } + ) + df["id"] = df["id"].astype("Int64") # Cast as nullable int64 type + df["name"] = df["name"].astype("string") + + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + partition_cols=partition_cols, + additional_table_properties=additional_table_properties, + ) + + df_out = wr.athena.read_sql_query( + sql=f'SELECT * FROM "{glue_table}" ORDER BY id', + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + + assert df.equals(df_out) + + +def test_athena_to_iceberg_schema_evolution_add_columns( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + ) + + df["c2"] = [6, 7, 8] + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + ) + + column_types = wr.catalog.get_table_types(glue_database, glue_table) + assert len(column_types) == len(df.columns) + + df_out = wr.athena.read_sql_table( + table=glue_table, + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + assert len(df_out) == len(df) * 2 + + df["c3"] = [9, 10, 11] + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=False, + fill_missing_columns_in_df=False, + ) + + +def test_athena_to_iceberg_schema_evolution_modify_columns( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + # Version 1 + df = pd.DataFrame({"c1": pd.Series([1.0, 2.0], dtype="float32"), "c2": pd.Series([-1, -2], dtype="int32")}) + + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + ) + + df_out = wr.athena.read_sql_table( + table=glue_table, + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + + assert len(df_out) == 2 + assert len(df_out.columns) == 2 + assert str(df_out["c1"].dtype).startswith("float32") + assert str(df_out["c2"].dtype).startswith("Int32") + + # Version 2 + df2 = pd.DataFrame({"c1": pd.Series([3.0, 4.0], dtype="float64"), "c2": pd.Series([-3, -4], dtype="int64")}) + + wr.athena.to_iceberg( + df=df2, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + ) + + df2_out = wr.athena.read_sql_table( + table=glue_table, + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + + assert len(df2_out) == 4 + assert len(df2_out.columns) == 2 + assert str(df2_out["c1"].dtype).startswith("float64") + assert str(df2_out["c2"].dtype).startswith("Int64") + + +@pytest.mark.parametrize("schema_evolution", [False, True]) +def test_athena_to_iceberg_schema_evolution_fill_missing_columns( + path: str, path2: str, glue_database: str, glue_table: str, schema_evolution: bool +) -> None: + df = pd.DataFrame({"c0": [0, 1, 2], "c1": ["foo", "bar", "baz"], "c2": [10, 11, 12]}) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + ) + + df = pd.DataFrame({"c0": [3, 4, 5]}) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=schema_evolution, + fill_missing_columns_in_df=True, + ) + + df_actual = wr.athena.read_sql_table( + table=glue_table, + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + df_actual = df_actual.sort_values("c0").reset_index(drop=True) + df_actual["c0"] = df_actual["c0"].astype("int64") + + df_expected = pd.DataFrame( + { + "c0": [0, 1, 2, 3, 4, 5], + "c1": ["foo", "bar", "baz", None, None, None], + "c2": [10, 11, 12, None, None, None], + }, + ) + df_expected["c1"] = df_expected["c1"].astype("string") + df_expected["c2"] = df_expected["c2"].astype("Int64") + + assert_pandas_equals(df_actual, df_expected) + + +def test_athena_to_iceberg_schema_evolution_drop_columns_error( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + ) + + df = pd.DataFrame({"c0": [6, 7, 8]}) + + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + schema_evolution=True, + fill_missing_columns_in_df=False, + ) + + +def test_to_iceberg_cast(path: str, path2: str, glue_table: str, glue_database: str) -> None: + df = pd.DataFrame( + { + "c0": [ + datetime.date(4000, 1, 1), + datetime.datetime(2000, 1, 1, 10), + "2020", + "2020-01", + 1, + None, + pd.NA, + pd.NaT, + np.nan, + np.inf, + ] + } + ) + df_expected = pd.DataFrame( + { + "c0": [ + datetime.date(1970, 1, 1), + datetime.date(2000, 1, 1), + datetime.date(2020, 1, 1), + datetime.date(2020, 1, 1), + datetime.date(4000, 1, 1), + None, + None, + None, + None, + None, + ] + } + ) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + dtype={"c0": "date"}, + ) + df2 = wr.athena.read_sql_table(database=glue_database, table=glue_table, ctas_approach=False) + assert pandas_equals(df_expected, df2.sort_values("c0").reset_index(drop=True)) + + +def test_athena_to_iceberg_with_hyphenated_table_name( + path: str, path2: str, glue_database: str, glue_table_with_hyphenated_name: str +) -> None: + df = pd.DataFrame({"c0": [1, 2, 3, 4], "c1": ["foo", "bar", "baz", "boo"]}) + df["c0"] = df["c0"].astype("int") + df["c1"] = df["c1"].astype("string") + + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table_with_hyphenated_name, + table_location=path, + temp_path=path2, + keep_files=False, + ) + + df_out = wr.athena.read_sql_query( + sql=f'SELECT * FROM "{glue_table_with_hyphenated_name}"', + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + + assert len(df) == len(df_out) + assert len(df.columns) == len(df_out.columns) + + +def test_athena_to_iceberg_column_comments(path: str, path2: str, glue_database: str, glue_table: str) -> None: + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}) + column_comments = { + "c0": "comment 0", + "c1": "comment 1", + } + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + glue_table_settings={ + "columns_comments": column_comments, + }, + ) + + column_comments_actual = wr.catalog.get_columns_comments(glue_database, glue_table) + + assert column_comments_actual == column_comments + + +def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str, glue_table: str) -> None: + df = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["1984", "1996"], "gross": [35_000_000, 60_000_000]}) + df["title"] = df["title"].astype("string") + df["year"] = df["year"].astype("string") + df["gross"] = df["gross"].astype("Int64") + + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + ) + + # Perform MERGE INTO + df2 = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["2021", "1996"], "gross": [400_000_000, 60_000_001]}) + df2["title"] = df2["title"].astype("string") + df2["year"] = df2["year"].astype("string") + df2["gross"] = df2["gross"].astype("Int64") + + wr.athena.to_iceberg( + df=df2, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + merge_cols=["title", "year"], + ) + + # Expected output + df_expected = pd.DataFrame( + { + "title": ["Dune", "Fargo", "Dune"], + "year": ["1984", "1996", "2021"], + "gross": [35_000_000, 60_000_001, 400_000_000], + } + ) + df_expected["title"] = df_expected["title"].astype("string") + df_expected["year"] = df_expected["year"].astype("string") + df_expected["gross"] = df_expected["gross"].astype("Int64") + + df_out = wr.athena.read_sql_query( + sql=f'SELECT * FROM "{glue_table}" ORDER BY year', + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + + assert_pandas_equals(df_expected, df_out)