Skip to content

Commit

Permalink
Fix error file not found. tmp file is deleted before inserting rows t…
Browse files Browse the repository at this point in the history
…o DB in VerticaToMySQLOperator bulk .
  • Loading branch information
Amir.Ba committed Sep 24, 2024
1 parent e14b4ca commit f919c26
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions airflow/providers/mysql/transfers/vertica_to_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,20 @@ def _bulk_load_transfer(self, mysql, vertica):
count += 1

tmpfile.flush()
self._run_preoperator(mysql)
try:
self.log.info("Bulk inserting rows into MySQL...")
with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor:
cursor.execute(
f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
f"INTO TABLE {self.mysql_table} "
f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})"
)
conn.commit()
tmpfile.close()
self.log.info("Inserted rows into MySQL %s", count)
except (MySQLdb.Error, MySQLdb.Warning):
self.log.info("Inserted rows into MySQL 0")
raise
self._run_preoperator(mysql)
try:
self.log.info("Bulk inserting rows into MySQL...")
with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor:
cursor.execute(
f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
f"INTO TABLE {self.mysql_table} "
f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})"
)
conn.commit()
self.log.info("Inserted rows into MySQL %s", count)
except (MySQLdb.Error, MySQLdb.Warning):
self.log.info("Inserted rows into MySQL 0")
raise

def _run_preoperator(self, mysql):
if self.mysql_preoperator:
Expand Down

0 comments on commit f919c26

Please sign in to comment.