Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly set Delta table props to accommodate for different defaults [databricks] #11970

Merged
merged 11 commits into from
Jan 31, 2025
34 changes: 15 additions & 19 deletions integration_tests/src/main/python/delta_lake_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@
import os.path
import re

from spark_session import is_databricks122_or_later
from spark_session import is_databricks122_or_later, supports_delta_lake_deletion_vectors

delta_meta_allow = [
"DeserializeToObjectExec",
Expand All @@ -33,7 +33,7 @@

delta_writes_enabled_conf = {"spark.rapids.sql.format.delta.write.enabled": "true"}

delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec,WriteFilesExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec,WriteFilesExec,DeltaInvariantCheckerExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_check = "DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"

delta_optimized_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec,DeltaOptimizedWriterExec,WriteFilesExec" if is_databricks122_or_later() else "ExecutedCommandExec"
Expand Down Expand Up @@ -155,25 +155,21 @@ def schema_to_ddl(spark, schema):

def setup_delta_dest_table(spark, path, dest_table_func, use_cdf, partition_columns=None, enable_deletion_vectors=False):
dest_df = dest_table_func(spark)
writer = dest_df.write.format("delta")
# append to SQL-created table
writer = dest_df.write.format("delta").mode("append")
ddl = schema_to_ddl(spark, dest_df.schema)
table_properties = {}
if use_cdf:
table_properties['delta.enableChangeDataFeed'] = 'true'
if enable_deletion_vectors:
table_properties['delta.enableDeletionVectors'] = 'true'
if len(table_properties) > 0:
# if any table properties are specified then we need to use SQL to define the table
sql_text = "CREATE TABLE delta.`{path}` ({ddl}) USING DELTA".format(path=path, ddl=ddl)
if partition_columns:
sql_text += " PARTITIONED BY ({})".format(",".join(partition_columns))
properties = ', '.join(key + ' = ' + value for key, value in table_properties.items())
sql_text += " TBLPROPERTIES ({})".format(properties)
spark.sql(sql_text)
elif partition_columns:
table_properties['delta.enableChangeDataFeed'] = str(use_cdf).lower()
if supports_delta_lake_deletion_vectors():
table_properties['delta.enableDeletionVectors'] = str(enable_deletion_vectors).lower()
# if any table properties are specified then we need to use SQL to define the table
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
sql_text = "CREATE TABLE delta.`{path}` ({ddl}) USING DELTA".format(path=path, ddl=ddl)
if partition_columns:
sql_text += " PARTITIONED BY ({})".format(",".join(partition_columns))
writer = writer.partitionBy(*partition_columns)
if use_cdf or enable_deletion_vectors:
writer = writer.mode("append")
properties = ', '.join(key + ' = ' + value for key, value in table_properties.items())
sql_text += " TBLPROPERTIES ({})".format(properties)
spark.sql(sql_text)
writer.save(path)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saving right after you have already created a table will cause a DELTA_PATH_EXISTS error to be thrown

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled by the append mode

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I see that you are setting the mode to append

def setup_delta_dest_tables(spark, data_path, dest_table_func, use_cdf, partition_columns=None, enable_deletion_vectors=False):
Expand Down
Loading