Skip to content

Commit

Permalink
Merge pull request #1700 from bcgov/feat/alex-etl-updates-250114
Browse files Browse the repository at this point in the history
Feat: ETL Clean Up Script Extended
  • Loading branch information
AlexZorkin authored Jan 16, 2025
2 parents f786ea2 + 0ddfee4 commit 1a52e70
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
Binary file modified etl/database/nifi-registry-primary.mv.db
Binary file not shown.
Binary file modified etl/nifi/conf/flow.json.gz
Binary file not shown.
Binary file modified etl/nifi/conf/flow.xml.gz
Binary file not shown.
73 changes: 53 additions & 20 deletions etl/nifi_scripts/clean_ups.groovy
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import groovy.json.JsonSlurper
import java.sql.SQLException

log.warn('**** STARTING TRANSFER UPDATE SQL ****')

Expand All @@ -14,39 +13,73 @@ def updateTransferEffectiveDateSQL = """
AND update_date::date = transaction_effective_date::date; -- On the same day as transaction_effective_date
"""

// Fetch connections to both source and destination databases
// Replace the UUIDs with your actual Controller Service identifiers
// For this UPDATE, only the destination database connection is required
def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93')
// Cleanup queries
def cleanUpQueries = [
"""
-- 105
UPDATE "transaction"
SET compliance_units = -6994
WHERE transaction_id = 1491;
""",
"""
-- 273
UPDATE compliance_report
SET transaction_id = null
WHERE compliance_report_id = 764;
""",
"""
DELETE FROM "transaction"
WHERE transaction_id = 1920;
"""
]

// Initialize database connections
// Fetch connection to the destination database
def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93')
Connection destinationConn = null

try {
// Get a connection from the Destination DBCP Connection Pool
// Obtain a connection from the Destination DBCP Connection Pool
destinationConn = destinationDbcpService.getConnection()
destinationConn.setAutoCommit(false) // Begin transaction

// Step 1: Execute the UPDATE statement
PreparedStatement updateStmt = destinationConn.prepareStatement(updateTransferEffectiveDateSQL)

// Execute the UPDATE statement
int rowsUpdated = updateStmt.executeUpdate()
// Step 1: Execute the UPDATE on public.transfer
try (PreparedStatement updateStmt = destinationConn.prepareStatement(updateTransferEffectiveDateSQL)) {
int rowsUpdated = updateStmt.executeUpdate()
log.info("Successfully executed UPDATE on 'public.transfer'. Rows affected: ${rowsUpdated}")
}

log.info("Successfully executed UPDATE on 'public.transfer'. Rows affected: ${rowsUpdated}")
// Step 2: Execute the cleanup queries in sequence
cleanUpQueries.each { query ->
try (PreparedStatement stmt = destinationConn.prepareStatement(query)) {
stmt.executeUpdate()
}
}
log.info("Cleanup queries executed successfully.")

// Close the UPDATE statement
updateStmt.close()
// Commit transaction
destinationConn.commit()
log.info("Transaction committed successfully.")

} catch (Exception e) {
log.error('Error occurred while executing TRANSFER UPDATE SQL', e)
throw new ProcessException(e)
// Rollback transaction on error
if (destinationConn != null) {
try {
destinationConn.rollback()
log.warn("Transaction rolled back due to error.")
} catch (SQLException rollbackEx) {
log.error("Error occurred during transaction rollback", rollbackEx)
}
}
log.error('Error occurred during SQL operations', e)
throw new RuntimeException(e)
} finally {
// Ensure the connection is closed
if (destinationConn != null) {
try {
destinationConn.close()
} catch (SQLException ignore) {
// Ignored
log.info("Database connection closed.")
} catch (SQLException closeEx) {
log.warn("Error occurred while closing the database connection", closeEx)
}
}
}
Expand Down

0 comments on commit 1a52e70

Please sign in to comment.