Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: ETL Txns, CR and Transfer #1727

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions etl/nifi_scripts/clean_ups.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.SQLException

log.warn('**** STARTING TRANSFER UPDATE SQL ****')
log.warn('**** STARTING CLEAN UPS UPDATE SQL ****')

// SQL query to update transaction_effective_date
def updateTransferEffectiveDateSQL = """
Expand All @@ -25,11 +25,11 @@ def cleanUpQueries = [
-- 273
UPDATE compliance_report
SET transaction_id = null
WHERE compliance_report_id = 764;
WHERE transaction_id IN (1920, 5046, 5071, 5315);
""",
"""
DELETE FROM "transaction"
WHERE transaction_id = 1920;
WHERE transaction_id IN (1920, 5046, 5071, 5315);
"""
]

Expand Down Expand Up @@ -84,4 +84,4 @@ try {
}
}

log.warn('**** COMPLETED TRANSFER UPDATE SQL ****')
log.warn('**** COMPLETED CLEAN UPS UPDATE SQL ****')
113 changes: 112 additions & 1 deletion etl/nifi_scripts/compliance_report.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,73 @@ WHERE cr.type_id = 1
ORDER BY cr.root_report_id NULLS FIRST, cr.traversal, cr.id;
"""

// Temporarily remove (drop) all audit triggers on listed tables
// so no audit_log entries occur during this data load.
def dropAuditTriggers = """
DO \$\$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN (
'transaction', 'compliance_report', 'compliance_report_history',
'compliance_report_status', 'compliance_report_summary', 'compliance_period',
'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history',
'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code',
'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type',
'fuel_export', 'organization', 'organization_address', 'organization_attorney_address',
'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history',
'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message',
'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history',
'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer',
'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density',
'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment',
'user_login_history', 'unit_of_measure', 'target_carbon_intensity'
)
LOOP
EXECUTE format('DROP TRIGGER IF EXISTS audit_%I_insert_update_delete ON %I;', r.tablename, r.tablename);
END LOOP;
END;
\$\$;
"""

// Re-add the audit triggers to each table in the list
def reAddAuditTriggers = """
DO \$\$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN (
'transaction', 'compliance_report', 'compliance_report_history',
'compliance_report_status', 'compliance_report_summary', 'compliance_period',
'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history',
'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code',
'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type',
'fuel_export', 'organization', 'organization_address', 'organization_attorney_address',
'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history',
'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message',
'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history',
'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer',
'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density',
'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment',
'user_login_history', 'unit_of_measure', 'target_carbon_intensity'
)
LOOP
EXECUTE format('
CREATE TRIGGER audit_%I_insert_update_delete
AFTER INSERT OR UPDATE OR DELETE ON %I
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();',
r.tablename, r.tablename);
END LOOP;
END;
\$\$;
"""

// =========================================
// NiFi Controller Services
// =========================================
Expand All @@ -120,6 +187,9 @@ try {

// Disable refresh of the materialized views
def stmt = destinationConn.createStatement()

stmt.execute(dropAuditTriggers)

stmt.execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;')
stmt.execute("""
CREATE OR REPLACE FUNCTION refresh_transaction_aggregate()
Expand All @@ -139,6 +209,24 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
stmt.execute('DROP FUNCTION IF EXISTS refresh_mv_org_compliance_report_count() CASCADE;')
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count()
RETURNS void AS \$\$
BEGIN
-- Temporarily disable the materialized view refresh
END;
\$\$ LANGUAGE plpgsql;
""")
stmt.execute('DROP FUNCTION IF EXISTS refresh_mv_compliance_report_count() CASCADE;')
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_compliance_report_count()
RETURNS void AS \$\$
BEGIN
-- Temporarily disable the materialized view refresh
END;
\$\$ LANGUAGE plpgsql;
""")
stmt.close()

// Load reference data for status mapping
Expand Down Expand Up @@ -520,7 +608,30 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count')
// stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count')

stmt.execute("""
CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count()
RETURNS void AS \$\$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count;
END;
\$\$ LANGUAGE plpgsql;
""")
// stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count')

stmt.execute("""
CREATE OR REPLACE FUNCTION refresh_mv_compliance_report_count()
RETURNS void AS \$\$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_compliance_report_count;
END;
\$\$ LANGUAGE plpgsql;
""")
// stmt.execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_compliance_report_count')

stmt.execute(reAddAuditTriggers)

stmt.close()

log.warn("Inserted ${totalInserted} compliance reports, ${totalHistoryInserted} history records, and ${totalTransactionsInserted} transactions into LCFS.")
Expand Down
110 changes: 110 additions & 0 deletions etl/nifi_scripts/transfer.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ SELECT
ct.update_user_id as update_user,
ct.update_timestamp as update_date,
ct.create_timestamp as create_date,
ctzr.description,
json_agg (
json_build_object (
'transfer_id',
Expand Down Expand Up @@ -60,6 +61,7 @@ SELECT
JOIN credit_trade_status cts ON ct.status_id = cts.id
LEFT JOIN credit_trade_history cth ON cth.credit_trade_id = ct.id
JOIN credit_trade_status cts_history ON cth.status_id = cts_history.id
LEFT JOIN credit_trade_zero_reason ctzr ON ctzr.id = ct.zero_reason_id
WHERE
ctt.the_type IN ('Buy', 'Sell')
GROUP BY
Expand All @@ -72,6 +74,7 @@ SELECT
ct.number_of_credits,
ctc.category,
cts.status,
ctzr.description,
ctt.the_type;
"""

Expand Down Expand Up @@ -121,6 +124,74 @@ def INTERNAL_COMMENT_QUERY = """
ctc.credit_trade_id, ctc.create_timestamp;
"""


// Temporarily remove (drop) all audit triggers on listed tables
// so no audit_log entries occur during this data load.
def dropAuditTriggers = """
DO \$\$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN (
'transaction', 'compliance_report', 'compliance_report_history',
'compliance_report_status', 'compliance_report_summary', 'compliance_period',
'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history',
'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code',
'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type',
'fuel_export', 'organization', 'organization_address', 'organization_attorney_address',
'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history',
'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message',
'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history',
'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer',
'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density',
'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment',
'user_login_history', 'unit_of_measure', 'target_carbon_intensity'
)
LOOP
EXECUTE format('DROP TRIGGER IF EXISTS audit_%I_insert_update_delete ON %I;', r.tablename, r.tablename);
END LOOP;
END;
\$\$;
"""

// Re-add the audit triggers to each table in the list
def reAddAuditTriggers = """
DO \$\$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN (
'transaction', 'compliance_report', 'compliance_report_history',
'compliance_report_status', 'compliance_report_summary', 'compliance_period',
'initiative_agreement', 'initiative_agreement_status', 'initiative_agreement_history',
'allocation_agreement', 'allocation_transaction_type', 'custom_fuel_type', 'fuel_code',
'fuel_code_prefix', 'fuel_code_status', 'fuel_category', 'fuel_instance', 'fuel_type',
'fuel_export', 'organization', 'organization_address', 'organization_attorney_address',
'organization_status', 'organization_type', 'transfer', 'transfer_category', 'transfer_history',
'transfer_status', 'internal_comment', 'user_profile', 'user_role', 'role', 'notification_message',
'notification_type', 'admin_adjustment', 'admin_adjustment_status', 'admin_adjustment_history',
'provision_of_the_act', 'supplemental_report', 'final_supply_equipment', 'notional_transfer',
'fuel_supply', 'additional_carbon_intensity', 'document', 'end_use_type', 'energy_density',
'energy_effectiveness_ratio', 'transport_mode', 'final_supply_equipment', 'level_of_equipment',
'user_login_history', 'unit_of_measure', 'target_carbon_intensity'
)
LOOP
EXECUTE format('
CREATE TRIGGER audit_%I_insert_update_delete
AFTER INSERT OR UPDATE OR DELETE ON %I
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();',
r.tablename, r.tablename);
END LOOP;
END;
\$\$;
"""

def USER_ID_QUERY = 'select keycloak_username from user_profile up where user_profile_id = ? limit 1'

// Fetch connections to both the source and destination databases
Expand All @@ -140,8 +211,12 @@ try {

def statements = prepareStatements(destinationConn)

destinationConn.createStatement().execute(dropAuditTriggers)

destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;')
destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_transaction_count() CASCADE;')
destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_director_review_transaction_count() CASCADE;')
destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_org_compliance_report_count() CASCADE;')
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_transaction_aggregate()
RETURNS void AS \$\$
Expand All @@ -158,6 +233,22 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_director_review_transaction_count()
RETURNS void AS \$\$
BEGIN
-- Temporarily disable the materialized view refresh
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count()
RETURNS void AS \$\$
BEGIN
-- Temporarily disable the materialized view refresh
END;
\$\$ LANGUAGE plpgsql;
""")

PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY)
PreparedStatement commentStmt = sourceConn.prepareStatement(COMMENT_QUERY)
Expand Down Expand Up @@ -222,7 +313,26 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_director_review_transaction_count()
RETURNS void AS \$\$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count;
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_org_compliance_report_count()
RETURNS void AS \$\$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count;
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute(reAddAuditTriggers)
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_aggregate')
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_director_review_transaction_count')
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_org_compliance_report_count')
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count')

destinationConn.commit()
Expand Down
Loading