Skip to content

Commit

Permalink
Merge pull request #1 from bareketamir/fix-vertica-to-mysql-operator
Browse files Browse the repository at this point in the history
Fix error file not found. tmp file is deleted before inserting rows to DB in VerticaToMySQLOperator bulk .
  • Loading branch information
bareketamir authored Nov 11, 2024
2 parents e14b4ca + fa32482 commit e8d6413
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
29 changes: 14 additions & 15 deletions airflow/providers/mysql/transfers/vertica_to_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,20 @@ def _bulk_load_transfer(self, mysql, vertica):
count += 1

tmpfile.flush()
self._run_preoperator(mysql)
try:
self.log.info("Bulk inserting rows into MySQL...")
with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor:
cursor.execute(
f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
f"INTO TABLE {self.mysql_table} "
f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})"
)
conn.commit()
tmpfile.close()
self.log.info("Inserted rows into MySQL %s", count)
except (MySQLdb.Error, MySQLdb.Warning):
self.log.info("Inserted rows into MySQL 0")
raise
self._run_preoperator(mysql)
try:
self.log.info("Bulk inserting rows into MySQL...")
with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor:
cursor.execute(
f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
f"INTO TABLE {self.mysql_table} "
f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})"
)
conn.commit()
self.log.info("Inserted rows into MySQL %s", count)
except (MySQLdb.Error, MySQLdb.Warning):
self.log.info("Inserted rows into MySQL 0")
raise

def _run_preoperator(self, mysql):
if self.mysql_preoperator:
Expand Down
25 changes: 17 additions & 8 deletions tests/providers/mysql/transfers/test_vertica_to_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,26 @@


def mock_get_conn():

class MockCol:
def __init__(self, name):
self.name = name

col_a = MockCol(name="a")
col_b = MockCol(name="b")
col_c = MockCol(name="c")

commit_mock = mock.MagicMock()
cursor_mock = mock.MagicMock(
execute=[],
fetchall=[["1", "2", "3"]],
description=["a", "b", "c"],
iterate=[["1", "2", "3"]],
)
conn_mock = mock.MagicMock(
commit=commit_mock,
cursor=cursor_mock,
description = [col_a, col_b, col_c]
)
cursor_mock.execute.return_value=[]
cursor_mock.fetchall.return_value=[["1", "2", "3"]]
cursor_mock.iterate.return_value=[["1", "2", "3"]]
conn_mock = mock.MagicMock()
conn_mock.commit.return_value = commit_mock
conn_mock.cursor.return_value = cursor_mock

return conn_mock


Expand Down

0 comments on commit e8d6413

Please sign in to comment.