diff --git a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 3fe4ccf23107..06b9a94171b1 100644 --- a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -174,6 +174,7 @@ class GCSToBigQueryOperator(BaseOperator): destination table is newly created. If the table already exists and a value different than the current description is provided, the job will fail. :param deferrable: Run operator in the deferrable mode + :param force_delete: Force the destination table to be deleted if it already exists. """ template_fields: Sequence[str] = ( @@ -231,6 +232,7 @@ def __init__( force_rerun: bool = True, reattach_states: set[str] | None = None, project_id: str = PROVIDE_PROJECT_ID, + force_delete: bool = False, **kwargs, ) -> None: super().__init__(**kwargs) @@ -296,6 +298,7 @@ def __init__( self.force_rerun = force_rerun self.reattach_states: set[str] = reattach_states or set() self.cancel_on_kill = cancel_on_kill + self.force_delete = force_delete self.source_uris: list[str] = [] @@ -378,7 +381,11 @@ def execute(self, context: Context): max_id = self._find_max_value_in_column() return max_id else: - self.log.info("Using existing BigQuery table for storing data...") + if self.force_delete: + self.log.info("Deleting table %s", self.destination_project_dataset_table) + hook.delete_table(table_id=self.destination_project_dataset_table) + else: + self.log.info("Using existing BigQuery table for storing data...") self.configuration = self._use_existing_table() try: diff --git a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py index 046e9a711bbf..1097faa7491b 100644 --- a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py @@ -1946,3 +1946,28 @@ def create_context(self, task): "task_instance": task_instance, "logical_date": logical_date, } + + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_force_delete_should_execute_successfully(self, hook): + hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=REAL_JOB_ID, error_result=False), + REAL_JOB_ID, + ] + hook.return_value.generate_job_id.return_value = REAL_JOB_ID + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + hook.return_value.get_job.return_value.result.return_value = ("1",) + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + write_disposition=WRITE_DISPOSITION, + schema_fields=SCHEMA_FIELDS_INT, + autodetect=True, + project_id=JOB_PROJECT_ID, + force_delete=True, + ) + + operator.execute(context=MagicMock()) + hook.return_value.delete_table.assert_called_once_with(table_id=TEST_EXPLICIT_DEST)