diff --git a/CHANGES.md b/CHANGES.md index 40bf45b..990d93c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,8 @@ `check_uniqueness_factory` - Added `table_kwargs` context manager to enable pandas/Dask to support CrateDB dialect table options. +- Fixed SQL rendering of special DDL table options in `CrateDDLCompiler`. + Before, configuring `crate_"translog.durability"` was not possible. ## 2024/06/13 0.37.0 - Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying diff --git a/src/sqlalchemy_cratedb/compiler.py b/src/sqlalchemy_cratedb/compiler.py index d9e3bb7..f2bf9a5 100644 --- a/src/sqlalchemy_cratedb/compiler.py +++ b/src/sqlalchemy_cratedb/compiler.py @@ -100,13 +100,13 @@ def crate_before_execute(conn, clauseelement, multiparams, params, *args, **kwar class CrateDDLCompiler(compiler.DDLCompiler): __special_opts_tmpl = { - 'PARTITIONED_BY': ' PARTITIONED BY ({0})' + 'partitioned_by': ' PARTITIONED BY ({0})' } __clustered_opts_tmpl = { - 'NUMBER_OF_SHARDS': ' INTO {0} SHARDS', - 'CLUSTERED_BY': ' BY ({0})', + 'number_of_shards': ' INTO {0} SHARDS', + 'clustered_by': ' BY ({0})', } - __clustered_opt_tmpl = ' CLUSTERED{CLUSTERED_BY}{NUMBER_OF_SHARDS}' + __clustered_opt_tmpl = ' CLUSTERED{clustered_by}{number_of_shards}' def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) + " " + \ @@ -162,7 +162,7 @@ def post_create_table(self, table): table_opts = [] opts = dict( - (k[len(self.dialect.name) + 1:].upper(), v) + (k[len(self.dialect.name) + 1:], v) for k, v, in table.kwargs.items() if k.startswith('%s_' % self.dialect.name) ) diff --git a/tests/create_table_test.py b/tests/create_table_test.py index f74c45e..5f80604 100644 --- a/tests/create_table_test.py +++ b/tests/create_table_test.py @@ -154,7 +154,7 @@ class DummyTable(self.Base): ('\nCREATE TABLE t (\n\t' 'pk STRING NOT NULL, \n\t' 'PRIMARY KEY (pk)\n' - ') CLUSTERED INTO 3 SHARDS WITH (NUMBER_OF_REPLICAS = 2)\n\n'), + ') CLUSTERED INTO 3 SHARDS WITH (number_of_replicas = 2)\n\n'), ()) def test_table_clustered_by_and_number_of_shards(self): @@ -175,6 +175,21 @@ class DummyTable(self.Base): ') CLUSTERED BY (p) INTO 3 SHARDS\n\n'), ()) + def test_table_translog_durability(self): + class DummyTable(self.Base): + __tablename__ = 't' + __table_args__ = { + 'crate_"translog.durability"': "'async'", + } + pk = sa.Column(sa.String, primary_key=True) + self.Base.metadata.create_all(bind=self.engine) + fake_cursor.execute.assert_called_with( + ('\nCREATE TABLE t (\n\t' + 'pk STRING NOT NULL, \n\t' + 'PRIMARY KEY (pk)\n' + """) WITH ("translog.durability" = 'async')\n\n"""), + ()) + def test_column_object_array(self): class DummyTable(self.Base): __tablename__ = 't' diff --git a/tests/test_support_pandas.py b/tests/test_support_pandas.py index 1fa43a1..4e12af3 100644 --- a/tests/test_support_pandas.py +++ b/tests/test_support_pandas.py @@ -2,7 +2,6 @@ import sys import pytest -import sqlalchemy as sa from sqlalchemy.exc import ProgrammingError from sqlalchemy.orm import sessionmaker @@ -42,10 +41,7 @@ def test_table_kwargs_partitioned_by(cratedb_service): cratedb_service.database.refresh_table(TABLE_NAME) # Inquire table cardinality. - metadata = sa.MetaData() - query = sa.select(sa.func.count()).select_from(sa.Table(TABLE_NAME, metadata)) - results = session.execute(query) - count = results.scalar() + count = cratedb_service.database.count_records(TABLE_NAME) # Compare outcome. assert count == INSERT_RECORDS @@ -55,6 +51,38 @@ def test_table_kwargs_partitioned_by(cratedb_service): assert 'PARTITIONED BY ("time")' in ddl[0][0] +@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier") +@pytest.mark.skipif(SA_VERSION < SA_2_0, reason="Feature not supported on SQLAlchemy 1.4 and earlier") +def test_table_kwargs_translog_durability(cratedb_service): + """ + Validate adding CrateDB dialect table option `translog.durability` at runtime. + """ + + engine = cratedb_service.database.engine + + # Insert records from pandas dataframe. + with table_kwargs(**{'crate_"translog.durability"': "'async'"}): + df.to_sql( + TABLE_NAME, + engine, + if_exists="replace", + index=False, + ) + + # Synchronize writes. + cratedb_service.database.refresh_table(TABLE_NAME) + + # Inquire table cardinality. + count = cratedb_service.database.count_records(TABLE_NAME) + + # Compare outcome. + assert count == INSERT_RECORDS + + # Validate SQL DDL. + ddl = cratedb_service.database.run_sql(f"SHOW CREATE TABLE {TABLE_NAME}") + assert """"translog.durability" = 'ASYNC'""" in ddl[0][0] + + @pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier") @pytest.mark.skipif(SA_VERSION < SA_2_0, reason="Feature not supported on SQLAlchemy 1.4 and earlier") def test_table_kwargs_unknown(cratedb_service):