From d1aa408ac7dfada75329e4c450c2d4f2329d3659 Mon Sep 17 00:00:00 2001 From: Alex Zorkin Date: Thu, 16 Jan 2025 20:17:26 -0800 Subject: [PATCH] fix: org txn fixes, compliance report and transfer fixes and optimization --- etl/nifi_scripts/clean_ups.groovy | 8 +- etl/nifi_scripts/compliance_report.groovy | 113 +++++++++++++++++++++- etl/nifi_scripts/transfer.groovy | 112 ++++++++++++++++++++- 3 files changed, 226 insertions(+), 7 deletions(-) diff --git a/etl/nifi_scripts/clean_ups.groovy b/etl/nifi_scripts/clean_ups.groovy index 8a893064d..d5afa003e 100644 --- a/etl/nifi_scripts/clean_ups.groovy +++ b/etl/nifi_scripts/clean_ups.groovy @@ -2,7 +2,7 @@ import java.sql.Connection import java.sql.PreparedStatement import java.sql.SQLException -log.warn('**** STARTING TRANSFER UPDATE SQL ****') +log.warn('**** STARTING CLEAN UPS UPDATE SQL ****') // SQL query to update transaction_effective_date def updateTransferEffectiveDateSQL = """ @@ -25,11 +25,11 @@ def cleanUpQueries = [ -- 273 UPDATE compliance_report SET transaction_id = null - WHERE compliance_report_id = 764; + WHERE transaction_id IN (1920, 5046, 5071, 5315); """, """ DELETE FROM "transaction" - WHERE transaction_id = 1920; + WHERE transaction_id IN (1920, 5046, 5071, 5315); """ ] @@ -84,4 +84,4 @@ try { } } -log.warn('**** COMPLETED TRANSFER UPDATE SQL ****') +log.warn('**** COMPLETED CLEAN UPS UPDATE SQL ****') diff --git a/etl/nifi_scripts/compliance_report.groovy b/etl/nifi_scripts/compliance_report.groovy index b9e5e8fe2..99bdfef78 100644 --- a/etl/nifi_scripts/compliance_report.groovy +++ b/etl/nifi_scripts/compliance_report.groovy @@ -100,6 +100,73 @@ WHERE cr.type_id = 1 ORDER BY cr.root_report_id NULLS FIRST, cr.traversal, cr.id; """ +// Temporarily remove (drop) all audit triggers on listed tables +// so no audit_log entries occur during this data load. +def dropAuditTriggers = """ +DO \$\$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ( + 'transaction', 'compliance_report', 'compliance_report_history', + 'compliance_report_status', 'compliance_report_summary', 'compliance_period', + 'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history', + 'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code', + 'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type', + 'fuel_export', 'organization', 'organization_address', 'organization_attorney_address', + 'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history', + 'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message', + 'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history', + 'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer', + 'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density', + 'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment', + 'user_login_history', 'unit_of_measure', 'target_carbon_intensity' + ) + LOOP + EXECUTE format('DROP TRIGGER IF EXISTS audit_%I_insert_update_delete ON %I;', r.tablename, r.tablename); + END LOOP; +END; +\$\$; +""" + +// Re-add the audit triggers to each table in the list +def reAddAuditTriggers = """ +DO \$\$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ( + 'transaction', 'compliance_report', 'compliance_report_history', + 'compliance_report_status', 'compliance_report_summary', 'compliance_period', + 'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history', + 'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code', + 'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type', + 'fuel_export', 'organization', 'organization_address', 'organization_attorney_address', + 'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history', + 'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message', + 'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history', + 'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer', + 'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density', + 'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment', + 'user_login_history', 'unit_of_measure', 'target_carbon_intensity' + ) + LOOP + EXECUTE format(' + CREATE TRIGGER audit_%I_insert_update_delete + AFTER INSERT OR UPDATE OR DELETE ON %I + FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();', + r.tablename, r.tablename); + END LOOP; +END; +\$\$; +""" + // ========================================= // NiFi Controller Services // ========================================= @@ -120,6 +187,9 @@ try { // Disable refresh of the materialized views def stmt = destinationConn.createStatement() + + stmt.execute(dropAuditTriggers) + stmt.execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;') stmt.execute(""" CREATE OR REPLACE FUNCTION refresh_transaction_aggregate() @@ -139,6 +209,24 @@ try { END; \$\$ LANGUAGE plpgsql; """) + stmt.execute('DROP FUNCTION IF EXISTS refresh_mv_org_compliance_report_count() CASCADE;') + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count() + RETURNS void AS \$\$ + BEGIN + -- Temporarily disable the materialized view refresh + END; + \$\$ LANGUAGE plpgsql; + """) + stmt.execute('DROP FUNCTION IF EXISTS refresh_mv_compliance_report_count() CASCADE;') + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_compliance_report_count() + RETURNS void AS \$\$ + BEGIN + -- Temporarily disable the materialized view refresh + END; + \$\$ LANGUAGE plpgsql; + """) stmt.close() // Load reference data for status mapping @@ -520,7 +608,30 @@ try { END; \$\$ LANGUAGE plpgsql; """) - stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count') + // stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count') + + stmt.execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count() + RETURNS void AS \$\$ + BEGIN + REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count; + END; + \$\$ LANGUAGE plpgsql; + """) + // stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count') + + stmt.execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_compliance_report_count() + RETURNS void AS \$\$ + BEGIN + REFRESH MATERIALIZED VIEW CONCURRENTLY mv_compliance_report_count; + END; + \$\$ LANGUAGE plpgsql; + """) + // stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_compliance_report_count') + + stmt.execute(reAddAuditTriggers) + stmt.close() log.warn("Inserted ${totalInserted} compliance reports, ${totalHistoryInserted} history records, and ${totalTransactionsInserted} transactions into LCFS.") diff --git a/etl/nifi_scripts/transfer.groovy b/etl/nifi_scripts/transfer.groovy index b74f66cff..8e7bcf19f 100644 --- a/etl/nifi_scripts/transfer.groovy +++ b/etl/nifi_scripts/transfer.groovy @@ -21,6 +21,7 @@ SELECT ct.update_user_id as update_user, ct.update_timestamp as update_date, ct.create_timestamp as create_date, + ctzr.description, json_agg ( json_build_object ( 'transfer_id', @@ -60,6 +61,7 @@ SELECT JOIN credit_trade_status cts ON ct.status_id = cts.id LEFT JOIN credit_trade_history cth ON cth.credit_trade_id = ct.id JOIN credit_trade_status cts_history ON cth.status_id = cts_history.id + LEFT JOIN credit_trade_zero_reason ctzr ON ctzr.id = ct.zero_reason_id WHERE ctt.the_type IN ('Buy', 'Sell') GROUP BY @@ -73,8 +75,7 @@ SELECT ctc.category, cts.status, ctzr.description, - ctt.the_type, - internal_comment.role_names; + ctt.the_type; """ def COMMENT_QUERY = ''' @@ -123,6 +124,74 @@ def INTERNAL_COMMENT_QUERY = """ ctc.credit_trade_id, ctc.create_timestamp; """ + +// Temporarily remove (drop) all audit triggers on listed tables +// so no audit_log entries occur during this data load. +def dropAuditTriggers = """ +DO \$\$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ( + 'transaction', 'compliance_report', 'compliance_report_history', + 'compliance_report_status', 'compliance_report_summary', 'compliance_period', + 'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history', + 'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code', + 'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type', + 'fuel_export', 'organization', 'organization_address', 'organization_attorney_address', + 'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history', + 'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message', + 'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history', + 'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer', + 'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density', + 'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment', + 'user_login_history', 'unit_of_measure', 'target_carbon_intensity' + ) + LOOP + EXECUTE format('DROP TRIGGER IF EXISTS audit_%I_insert_update_delete ON %I;', r.tablename, r.tablename); + END LOOP; +END; +\$\$; +""" + +// Re-add the audit triggers to each table in the list +def reAddAuditTriggers = """ +DO \$\$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ( + 'transaction', 'compliance_report', 'compliance_report_history', + 'compliance_report_status', 'compliance_report_summary', 'compliance_period', + 'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history', + 'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code', + 'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type', + 'fuel_export', 'organization', 'organization_address', 'organization_attorney_address', + 'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history', + 'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message', + 'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history', + 'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer', + 'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density', + 'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment', + 'user_login_history', 'unit_of_measure', 'target_carbon_intensity' + ) + LOOP + EXECUTE format(' + CREATE TRIGGER audit_%I_insert_update_delete + AFTER INSERT OR UPDATE OR DELETE ON %I + FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();', + r.tablename, r.tablename); + END LOOP; +END; +\$\$; +""" + def USER_ID_QUERY = 'select keycloak_username from user_profile up where user_profile_id = ? limit 1' // Fetch connections to both the source and destination databases @@ -142,8 +211,12 @@ try { def statements = prepareStatements(destinationConn) + destinationConn.createStatement().execute(dropAuditTriggers) + destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;') destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_transaction_count() CASCADE;') + destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_director_review_transaction_count() CASCADE;') + destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_org_compliance_report_count() CASCADE;') destinationConn.createStatement().execute(""" CREATE OR REPLACE FUNCTION refresh_transaction_aggregate() RETURNS void AS \$\$ @@ -160,6 +233,22 @@ try { END; \$\$ LANGUAGE plpgsql; """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_director_review_transaction_count() + RETURNS void AS \$\$ + BEGIN + -- Temporarily disable the materialized view refresh + END; + \$\$ LANGUAGE plpgsql; + """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count() + RETURNS void AS \$\$ + BEGIN + -- Temporarily disable the materialized view refresh + END; + \$\$ LANGUAGE plpgsql; + """) PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY) PreparedStatement commentStmt = sourceConn.prepareStatement(COMMENT_QUERY) @@ -224,7 +313,26 @@ try { END; \$\$ LANGUAGE plpgsql; """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_director_review_transaction_count() + RETURNS void AS \$\$ + BEGIN + REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count; + END; + \$\$ LANGUAGE plpgsql; + """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count() + RETURNS void AS \$\$ + BEGIN + REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count; + END; + \$\$ LANGUAGE plpgsql; + """) + destinationConn.createStatement().execute(reAddAuditTriggers) destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_aggregate') + destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count') + destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count') destinationConn.commit() log.debug("Processed ${recordCount} records successfully.")