From f919c26a818f80c96a603700b93c0e3803a15483 Mon Sep 17 00:00:00 2001 From: "Amir.Ba" Date: Tue, 24 Sep 2024 16:17:34 +0300 Subject: [PATCH 1/2] Fix error file not found. tmp file is deleted before inserting rows to DB in VerticaToMySQLOperator bulk . --- .../mysql/transfers/vertica_to_mysql.py | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/airflow/providers/mysql/transfers/vertica_to_mysql.py b/airflow/providers/mysql/transfers/vertica_to_mysql.py index fd196315d790..9130f1f37038 100644 --- a/airflow/providers/mysql/transfers/vertica_to_mysql.py +++ b/airflow/providers/mysql/transfers/vertica_to_mysql.py @@ -133,21 +133,20 @@ def _bulk_load_transfer(self, mysql, vertica): count += 1 tmpfile.flush() - self._run_preoperator(mysql) - try: - self.log.info("Bulk inserting rows into MySQL...") - with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor: - cursor.execute( - f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " - f"INTO TABLE {self.mysql_table} " - f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})" - ) - conn.commit() - tmpfile.close() - self.log.info("Inserted rows into MySQL %s", count) - except (MySQLdb.Error, MySQLdb.Warning): - self.log.info("Inserted rows into MySQL 0") - raise + self._run_preoperator(mysql) + try: + self.log.info("Bulk inserting rows into MySQL...") + with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor: + cursor.execute( + f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " + f"INTO TABLE {self.mysql_table} " + f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})" + ) + conn.commit() + self.log.info("Inserted rows into MySQL %s", count) + except (MySQLdb.Error, MySQLdb.Warning): + self.log.info("Inserted rows into MySQL 0") + raise def _run_preoperator(self, mysql): if self.mysql_preoperator: From fa3248284a87135f0a5f415fa929efd35e946a47 Mon Sep 17 00:00:00 2001 From: amirbareket Date: Mon, 11 Nov 2024 14:36:32 +0200 Subject: [PATCH 2/2] fix the mock_get_conn function , so it will really mock the data and will return empty results --- .../mysql/transfers/test_vertica_to_mysql.py | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/tests/providers/mysql/transfers/test_vertica_to_mysql.py b/tests/providers/mysql/transfers/test_vertica_to_mysql.py index 7656a036449f..5b4051ff6a1c 100644 --- a/tests/providers/mysql/transfers/test_vertica_to_mysql.py +++ b/tests/providers/mysql/transfers/test_vertica_to_mysql.py @@ -31,17 +31,26 @@ def mock_get_conn(): + + class MockCol: + def __init__(self, name): + self.name = name + + col_a = MockCol(name="a") + col_b = MockCol(name="b") + col_c = MockCol(name="c") + commit_mock = mock.MagicMock() cursor_mock = mock.MagicMock( - execute=[], - fetchall=[["1", "2", "3"]], - description=["a", "b", "c"], - iterate=[["1", "2", "3"]], - ) - conn_mock = mock.MagicMock( - commit=commit_mock, - cursor=cursor_mock, + description = [col_a, col_b, col_c] ) + cursor_mock.execute.return_value=[] + cursor_mock.fetchall.return_value=[["1", "2", "3"]] + cursor_mock.iterate.return_value=[["1", "2", "3"]] + conn_mock = mock.MagicMock() + conn_mock.commit.return_value = commit_mock + conn_mock.cursor.return_value = cursor_mock + return conn_mock