diff --git a/etl/nifi_scripts/adminAdjTrxn.groovy b/etl/nifi_scripts/adminAdjTrxn.groovy index 27806df95..d4286a8ab 100644 --- a/etl/nifi_scripts/adminAdjTrxn.groovy +++ b/etl/nifi_scripts/adminAdjTrxn.groovy @@ -8,32 +8,6 @@ import java.sql.Timestamp log.warn("**** STARTED ADMIN ADJUSTMENT ETL ****") def SOURCE_QUERY = """ -WITH - internal_comment AS ( - SELECT - ctc.id, - ctc.credit_trade_id, - ctc.credit_trade_comment, - ctc.create_user_id, - ctc.create_timestamp, - STRING_AGG (r."name", '; ') AS role_names - FROM - credit_trade_comment ctc - JOIN "user" u ON u.id = ctc.create_user_id - AND u.organization_id = 1 - AND ctc.is_privileged_access = TRUE - JOIN user_role ur ON ur.user_id = u.id - JOIN "role" r ON ur.role_id = r.id - GROUP BY - ctc.id, - ctc.credit_trade_id, - ctc.credit_trade_comment, - ctc.create_user_id, - ctc.create_timestamp - ORDER BY - ctc.credit_trade_id, - ctc.create_timestamp - ) SELECT ct.id AS admin_adjustment_id, ct.respondent_id AS to_organization_id, @@ -44,10 +18,6 @@ WITH ct.update_user_id as update_user, ct.update_timestamp as update_date, ct.create_timestamp as create_date, - -- Aggregate comments from government with internal comment handling - STRING_AGG (DISTINCT gov_ctc.credit_trade_comment, '; ') AS gov_comment, - -- JSON aggregation for internal comments - json_agg (row_to_json (internal_comment)) AS internal_comments, -- JSON aggregation for credit trade history json_agg ( json_build_object ( @@ -77,41 +47,6 @@ WITH JOIN credit_trade_type ctt ON ct.type_id = ctt.id LEFT OUTER JOIN credit_trade_category ctc ON ct.trade_category_id = ctc.id JOIN credit_trade_status cts ON ct.status_id = cts.id - LEFT JOIN credit_trade_zero_reason ctzr ON ctzr.id = ct.zero_reason_id - AND ctzr.reason = 'Internal' - -- Join for Initiator Comments - LEFT JOIN credit_trade_comment from_ctc ON from_ctc.credit_trade_id = ct.id - AND from_ctc.create_user_id IN ( - SELECT - u.id - FROM - "user" u - WHERE - u.organization_id = ct.initiator_id - ) - -- Join for Respondent Comments - LEFT JOIN credit_trade_comment to_ctc ON to_ctc.credit_trade_id = ct.id - AND to_ctc.create_user_id IN ( - SELECT - u.id - FROM - "user" u - WHERE - u.organization_id = ct.respondent_id - ) - -- Join for Government Comments - LEFT JOIN credit_trade_comment gov_ctc ON gov_ctc.credit_trade_id = ct.id - AND gov_ctc.create_user_id IN ( - SELECT - u.id - FROM - "user" u - WHERE - u.organization_id = 1 - AND gov_ctc.is_privileged_access = FALSE - ) - -- Join the internal comment logic for role-based filtering and audience_scope - LEFT JOIN internal_comment ON internal_comment.credit_trade_id = ct.id -- Join for credit trade history LEFT JOIN credit_trade_history cth ON cth.credit_trade_id = ct.id JOIN credit_trade_status cts_history ON cth.status_id = cts_history.id @@ -123,10 +58,49 @@ WITH ct.date_of_written_agreement, ct.trade_effective_date, ct.number_of_credits, - cts.status, - ctzr.description, - internal_comment.role_names; - """ + cts.status; + """ +def COMMENT_QUERY = ''' + SELECT + ct.id AS credit_trade_id, + MAX(CASE + WHEN u.organization_id = 1 AND ctc.is_privileged_access = FALSE THEN ctc.credit_trade_comment + END) AS gov_comment + FROM + credit_trade ct + LEFT JOIN credit_trade_comment ctc ON ctc.credit_trade_id = ct.id + LEFT JOIN "user" u ON ctc.create_user_id = u.id + WHERE + ct.id = ? + GROUP BY + ct.id; +''' + +def INTERNAL_COMMENT_QUERY = """ + SELECT + ctc.id, + ctc.credit_trade_id, + ctc.credit_trade_comment, + ctc.create_user_id, + ctc.create_timestamp, + ctc.update_timestamp, + STRING_AGG (r."name", '; ') AS role_names + FROM + credit_trade_comment ctc + JOIN "user" u ON u.id = ctc.create_user_id + AND u.organization_id = 1 + AND ctc.is_privileged_access = TRUE + JOIN user_role ur ON ur.user_id = u.id + JOIN "role" r ON ur.role_id = r.id + WHERE ctc.credit_trade_id = ? + GROUP BY + ctc.id, ctc.credit_trade_id, ctc.credit_trade_comment, + ctc.create_user_id, ctc.create_timestamp + ORDER BY + ctc.credit_trade_id, ctc.create_timestamp; +""" + +def USER_ID_QUERY = 'select keycloak_username from user_profile up where user_profile_id = ? limit 1' // Fetch connections to both the source and destination databases def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') @@ -146,6 +120,7 @@ try { def statements = prepareStatements(destinationConn) destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;') + destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_transaction_count() CASCADE;') destinationConn.createStatement().execute(""" CREATE OR REPLACE FUNCTION refresh_transaction_aggregate() RETURNS void AS \$\$ @@ -154,8 +129,19 @@ try { END; \$\$ LANGUAGE plpgsql; """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_transaction_count() + RETURNS void AS \$\$ + BEGIN + -- Temporarily disable the materialized view refresh + END; + \$\$ LANGUAGE plpgsql; + """) PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY) + PreparedStatement commentStmt = sourceConn.prepareStatement(COMMENT_QUERY) + PreparedStatement internalCommentStmt = sourceConn.prepareStatement(INTERNAL_COMMENT_QUERY) + PreparedStatement getUserNameStmt = destinationConn.prepareStatement(USER_ID_QUERY) ResultSet resultSet = sourceStmt.executeQuery() int recordCount = 0 @@ -163,22 +149,19 @@ try { while (resultSet.next()) { recordCount++ def jsonSlurper = new JsonSlurper() - def internalComments = resultSet.getString('internal_comments') def creditTradeHistory = resultSet.getString('credit_trade_history') - - def internalCommentsJson = internalComments ? jsonSlurper.parseText(internalComments) : [] def creditTradeHistoryJson = creditTradeHistory ? jsonSlurper.parseText(creditTradeHistory) : [] def toTransactionId = processTransactions(resultSet.getString('current_status'), - resultSet, statements.transactionStmt) + resultSet, statements.transactionStmt, getUserNameStmt) def adminAdjustmentId = insertadminAdjustment(resultSet, statements.adminAdjustmentStmt, - toTransactionId, preparedData, destinationConn) + toTransactionId, preparedData, destinationConn, getUserNameStmt, commentStmt) if (adminAdjustmentId) { processHistory(adminAdjustmentId, creditTradeHistoryJson, statements.historyStmt, preparedData) - processInternalComments(adminAdjustmentId, internalCommentsJson, statements.internalCommentStmt, - statements.adminAdjustmentInternalCommentStmt) + processInternalComments(adminAdjustmentId, internalCommentStmt, statements.internalCommentStmt, + getUserNameStmt, statements.adminAdjustmentInternalCommentStmt) } else { log.warn("admin-adjustment not inserted for record: ${resultSet.getInt('admin_adjustment_id')}") } @@ -192,7 +175,16 @@ try { END; \$\$ LANGUAGE plpgsql; """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_transaction_count() + RETURNS void AS \$\$ + BEGIN + REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count; + END; + \$\$ LANGUAGE plpgsql; + """) destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_aggregate') + destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count') destinationConn.commit() log.debug("Processed ${recordCount} records successfully.") @@ -268,8 +260,8 @@ def prepareStatements(Connection conn) { ''' def INSERT_INTERNAL_COMMENT_SQL = ''' INSERT INTO internal_comment ( - internal_comment_id, comment, audience_scope, create_user, create_date - ) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?) + internal_comment_id, comment, audience_scope, create_user, create_date, update_date + ) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?, ?) RETURNING internal_comment_id ''' def INSERT_admin_adjustment_INTERNAL_COMMENT_SQL = ''' @@ -302,22 +294,44 @@ def toSqlTimestamp(String timestampString) { } } -def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt) { +def getUserName(PreparedStatement stmt, int userId) { + ResultSet rs = null + String userName = null + + try { + stmt.setInt(1, userId) + rs = stmt.executeQuery() + + if (rs.next()) { + userName = rs.getString('keycloak_username') + } else { + log.warn("No username found for user_id: ${userId}") + } + } catch (Exception e) { + log.error("Error while fetching username for user_id: ${userId}", e) + } finally { + if (rs != null) rs.close() + } + + return userName +} + +def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt, PreparedStatement getUserNameStmt) { def toTransactionId = null if (currentStatus == 'Approved') { - toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id')) + toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'), getUserNameStmt) } return toTransactionId } -def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId) { +def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId, PreparedStatement getUserNameStmt) { stmt.setInt(1, rs.getInt('compliance_units')) stmt.setInt(2, orgId) stmt.setString(3, action) stmt.setDate(4, rs.getDate('transaction_effective_date') ?: rs.getDate('agreement_date')) - stmt.setInt(5, rs.getInt('create_user')) + stmt.setString(5, getUserName(getUserNameStmt, rs.getInt('create_user'))) stmt.setTimestamp(6, rs.getTimestamp('create_date')) def result = stmt.executeQuery() @@ -355,21 +369,24 @@ def processHistory(Integer adminAdjustmentId, List creditTradeHistory, PreparedS historyStmt.executeBatch() } - -def processInternalComments(Integer adminAdjustmentId, List internalComments, +def processInternalComments(Integer adminAdjustmentId, PreparedStatement sourceInternalCommentStmt, PreparedStatement internalCommentStmt, + PreparedStatement getUserNameStmt, PreparedStatement adminAdjustmentInternalCommentStmt) { - if (!internalComments) return - - internalComments.each { comment -> - if (!comment) return // Skip null comments - + // Fetch internal comments + sourceInternalCommentStmt.setInt(1, adminAdjustmentId) + ResultSet internalCommentResult = sourceInternalCommentStmt.executeQuery() + while (internalCommentResult.next()) { try { // Insert the internal comment - internalCommentStmt.setString(1, comment.credit_trade_comment ?: '') - internalCommentStmt.setString(2, getAudienceScope(comment.role_names ?: '')) - internalCommentStmt.setInt(3, comment.create_user_id ?: null) - internalCommentStmt.setTimestamp(4, toSqlTimestamp(comment.create_timestamp ?: '2013-01-01T00:00:00Z')) + internalCommentStmt.setString(1, internalCommentResult.getString('credit_trade_comment') ?: '') + internalCommentStmt.setString(2, getAudienceScope(internalCommentResult.getString('role_names') ?: '')) + internalCommentStmt.setString(3, + getUserName(getUserNameStmt, internalCommentResult.getInt('create_user_id') ?: null)) + internalCommentStmt.setTimestamp(4, + internalCommentResult.getTimestamp('create_timestamp') ?: null) + internalCommentStmt.setTimestamp(5, + internalCommentResult.getTimestamp('update_timestamp') ?: null) def internalCommentId = null def commentResult = internalCommentStmt.executeQuery() @@ -380,14 +397,15 @@ def processInternalComments(Integer adminAdjustmentId, List internalComments, adminAdjustmentInternalCommentStmt.setInt(1, adminAdjustmentId) adminAdjustmentInternalCommentStmt.setInt(2, internalCommentId) adminAdjustmentInternalCommentStmt.executeUpdate() - } + } commentResult.close() } catch (Exception e) { log.error("Error processing internal comment for admin-adjustment ${adminAdjustmentId}: ${e.getMessage()}", e) } } - } + internalCommentResult.close() +} // Helper function to determine audience scope based on role names def getAudienceScope(String roleNames) { @@ -404,7 +422,7 @@ def getAudienceScope(String roleNames) { } def insertadminAdjustment(ResultSet rs, PreparedStatement adminAdjustmentStmt, - Long toTransactionId, Map preparedData, Connection conn) { + Long toTransactionId, Map preparedData, Connection conn, PreparedStatement getUserNameStmt, PreparedStatement commentStmt) { // Check for duplicates in the `admin_adjustment` table def adminAdjustmentId = rs.getInt('admin_adjustment_id') def duplicateCheckStmt = conn.prepareStatement('SELECT COUNT(*) FROM admin_adjustment WHERE admin_adjustment_id = ?') @@ -419,14 +437,20 @@ def insertadminAdjustment(ResultSet rs, PreparedStatement adminAdjustmentStmt, log.warn("Duplicate admin_adjustment detected with admin_adjustment_id: ${adminAdjustmentId}, skipping insertion.") return null } - + // Fetch comments + commentStmt.setInt(1, adminAdjustmentId) + ResultSet commentResult = commentStmt.executeQuery() + def govComment = null + if (commentResult.next()) { + govComment = commentResult.getString('gov_comment') + } // Proceed with insertion if no duplicate exists def statusId = getStatusId(rs.getString('current_status'), preparedData) adminAdjustmentStmt.setInt(1, rs.getInt('to_organization_id')) adminAdjustmentStmt.setObject(2, toTransactionId) adminAdjustmentStmt.setTimestamp(3, rs.getTimestamp('transaction_effective_date')) adminAdjustmentStmt.setInt(4, rs.getInt('compliance_units')) - adminAdjustmentStmt.setString(5, rs.getString('gov_comment')) + adminAdjustmentStmt.setString(5, govComment) adminAdjustmentStmt.setObject(6, statusId) adminAdjustmentStmt.setTimestamp(7, rs.getTimestamp('create_date')) adminAdjustmentStmt.setTimestamp(8, rs.getTimestamp('update_date')) diff --git a/etl/nifi_scripts/initiativeAgrmtTrxn.groovy b/etl/nifi_scripts/initiativeAgrmtTrxn.groovy index cb615d037..735ad90b2 100644 --- a/etl/nifi_scripts/initiativeAgrmtTrxn.groovy +++ b/etl/nifi_scripts/initiativeAgrmtTrxn.groovy @@ -8,32 +8,6 @@ import java.sql.Timestamp log.warn("**** STARTING INITIATIVE AGREEMENT ETL ****") def SOURCE_QUERY = """ -WITH - internal_comment AS ( - SELECT - ctc.id, - ctc.credit_trade_id, - ctc.credit_trade_comment, - ctc.create_user_id, - ctc.create_timestamp, - STRING_AGG (r."name", '; ') AS role_names - FROM - credit_trade_comment ctc - JOIN "user" u ON u.id = ctc.create_user_id - AND u.organization_id = 1 - AND ctc.is_privileged_access = TRUE - JOIN user_role ur ON ur.user_id = u.id - JOIN "role" r ON ur.role_id = r.id - GROUP BY - ctc.id, - ctc.credit_trade_id, - ctc.credit_trade_comment, - ctc.create_user_id, - ctc.create_timestamp - ORDER BY - ctc.credit_trade_id, - ctc.create_timestamp - ) SELECT ct.id AS initiative_agreement_id, ct.respondent_id AS to_organization_id, @@ -44,10 +18,6 @@ WITH ct.update_user_id as update_user, ct.update_timestamp as update_date, ct.create_timestamp as create_date, - -- Aggregate comments from government with internal comment handling - STRING_AGG (DISTINCT gov_ctc.credit_trade_comment, '; ') AS gov_comment, - -- JSON aggregation for internal comments - json_agg (row_to_json (internal_comment)) AS internal_comments, -- JSON aggregation for credit trade history json_agg ( json_build_object ( @@ -77,41 +47,6 @@ WITH JOIN credit_trade_type ctt ON ct.type_id = ctt.id LEFT OUTER JOIN credit_trade_category ctc ON ct.trade_category_id = ctc.id JOIN credit_trade_status cts ON ct.status_id = cts.id - LEFT JOIN credit_trade_zero_reason ctzr ON ctzr.id = ct.zero_reason_id - AND ctzr.reason = 'Internal' - -- Join for Initiator Comments - LEFT JOIN credit_trade_comment from_ctc ON from_ctc.credit_trade_id = ct.id - AND from_ctc.create_user_id IN ( - SELECT - u.id - FROM - "user" u - WHERE - u.organization_id = ct.initiator_id - ) - -- Join for Respondent Comments - LEFT JOIN credit_trade_comment to_ctc ON to_ctc.credit_trade_id = ct.id - AND to_ctc.create_user_id IN ( - SELECT - u.id - FROM - "user" u - WHERE - u.organization_id = ct.respondent_id - ) - -- Join for Government Comments - LEFT JOIN credit_trade_comment gov_ctc ON gov_ctc.credit_trade_id = ct.id - AND gov_ctc.create_user_id IN ( - SELECT - u.id - FROM - "user" u - WHERE - u.organization_id = 1 - AND gov_ctc.is_privileged_access = FALSE - ) - -- Join the internal comment logic for role-based filtering and audience_scope - LEFT JOIN internal_comment ON internal_comment.credit_trade_id = ct.id -- Join for credit trade history LEFT JOIN credit_trade_history cth ON cth.credit_trade_id = ct.id JOIN credit_trade_status cts_history ON cth.status_id = cts_history.id @@ -123,10 +58,49 @@ WITH ct.date_of_written_agreement, ct.trade_effective_date, ct.number_of_credits, - cts.status, - ctzr.description, - internal_comment.role_names; + cts.status; """ +def COMMENT_QUERY = ''' + SELECT + ct.id AS credit_trade_id, + MAX(CASE + WHEN u.organization_id = 1 AND ctc.is_privileged_access = FALSE THEN ctc.credit_trade_comment + END) AS gov_comment + FROM + credit_trade ct + LEFT JOIN credit_trade_comment ctc ON ctc.credit_trade_id = ct.id + LEFT JOIN "user" u ON ctc.create_user_id = u.id + WHERE + ct.id = ? + GROUP BY + ct.id; +''' + +def INTERNAL_COMMENT_QUERY = """ + SELECT + ctc.id, + ctc.credit_trade_id, + ctc.credit_trade_comment, + ctc.create_user_id, + ctc.create_timestamp, + ctc.update_timestamp, + STRING_AGG (r."name", '; ') AS role_names + FROM + credit_trade_comment ctc + JOIN "user" u ON u.id = ctc.create_user_id + AND u.organization_id = 1 + AND ctc.is_privileged_access = TRUE + JOIN user_role ur ON ur.user_id = u.id + JOIN "role" r ON ur.role_id = r.id + WHERE ctc.credit_trade_id = ? + GROUP BY + ctc.id, ctc.credit_trade_id, ctc.credit_trade_comment, + ctc.create_user_id, ctc.create_timestamp + ORDER BY + ctc.credit_trade_id, ctc.create_timestamp; +""" + +def USER_ID_QUERY = 'select keycloak_username from user_profile up where user_profile_id = ? limit 1' // Fetch connections to both the source and destination databases def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') @@ -146,6 +120,7 @@ try { def statements = prepareStatements(destinationConn) destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;') + destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_transaction_count() CASCADE;') destinationConn.createStatement().execute(""" CREATE OR REPLACE FUNCTION refresh_transaction_aggregate() RETURNS void AS \$\$ @@ -154,8 +129,19 @@ try { END; \$\$ LANGUAGE plpgsql; """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_transaction_count() + RETURNS void AS \$\$ + BEGIN + -- Temporarily disable the materialized view refresh + END; + \$\$ LANGUAGE plpgsql; + """) PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY) + PreparedStatement commentStmt = sourceConn.prepareStatement(COMMENT_QUERY) + PreparedStatement internalCommentStmt = sourceConn.prepareStatement(INTERNAL_COMMENT_QUERY) + PreparedStatement getUserNameStmt = destinationConn.prepareStatement(USER_ID_QUERY) ResultSet resultSet = sourceStmt.executeQuery() int recordCount = 0 @@ -163,22 +149,19 @@ try { while (resultSet.next()) { recordCount++ def jsonSlurper = new JsonSlurper() - def internalComments = resultSet.getString('internal_comments') def creditTradeHistory = resultSet.getString('credit_trade_history') - - def internalCommentsJson = internalComments ? jsonSlurper.parseText(internalComments) : [] def creditTradeHistoryJson = creditTradeHistory ? jsonSlurper.parseText(creditTradeHistory) : [] def toTransactionId = processTransactions(resultSet.getString('current_status'), - resultSet, statements.transactionStmt) + resultSet, statements.transactionStmt, getUserNameStmt) def initiativeAgreementId = insertInitiativeAgreement(resultSet, statements.initiativeAgreementStmt, - toTransactionId, preparedData, destinationConn) + toTransactionId, preparedData, destinationConn, getUserNameStmt, commentStmt) if (initiativeAgreementId) { processHistory(initiativeAgreementId, creditTradeHistoryJson, statements.historyStmt, preparedData) - processInternalComments(initiativeAgreementId, internalCommentsJson, statements.internalCommentStmt, - statements.initiativeAgreementInternalCommentStmt) + processInternalComments(initiativeAgreementId, internalCommentStmt, statements.internalCommentStmt, + getUserNameStmt, statements.initiativeAgreementInternalCommentStmt) } else { log.warn("initiative-agreement not inserted for record: ${resultSet.getInt('initiative_agreement_id')}") } @@ -192,7 +175,16 @@ try { END; \$\$ LANGUAGE plpgsql; """) + destinationConn.createStatement().execute(""" + CREATE OR REPLACE FUNCTION refresh_mv_transaction_count() + RETURNS void AS \$\$ + BEGIN + REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count; + END; + \$\$ LANGUAGE plpgsql; + """) destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_aggregate') + destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count') destinationConn.commit() log.debug("Processed ${recordCount} records successfully.") @@ -268,8 +260,8 @@ def prepareStatements(Connection conn) { ''' def INSERT_INTERNAL_COMMENT_SQL = ''' INSERT INTO internal_comment ( - internal_comment_id, comment, audience_scope, create_user, create_date - ) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?) + internal_comment_id, comment, audience_scope, create_user, create_date, update_date + ) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?, ?) RETURNING internal_comment_id ''' def INSERT_INITIATIVE_AGREEMENT_INTERNAL_COMMENT_SQL = ''' @@ -302,22 +294,44 @@ def toSqlTimestamp(String timestampString) { } } -def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt) { +def getUserName(PreparedStatement stmt, int userId) { + ResultSet rs = null + String userName = null + + try { + stmt.setInt(1, userId) + rs = stmt.executeQuery() + + if (rs.next()) { + userName = rs.getString('keycloak_username') + } else { + log.warn("No username found for user_id: ${userId}") + } + } catch (Exception e) { + log.error("Error while fetching username for user_id: ${userId}", e) + } finally { + if (rs != null) rs.close() + } + + return userName +} + +def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt, PreparedStatement getUserNameStmt) { def toTransactionId = null if (currentStatus == 'Approved') { - toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id')) + toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'), getUserNameStmt) } return toTransactionId } -def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId) { +def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId, PreparedStatement getUserNameStmt) { stmt.setInt(1, rs.getInt('compliance_units')) stmt.setInt(2, orgId) stmt.setString(3, action) stmt.setDate(4, rs.getDate('transaction_effective_date') ?: rs.getDate('agreement_date')) - stmt.setInt(5, rs.getInt('create_user')) + stmt.setString(5, getUserName(getUserNameStmt, rs.getInt('create_user'))) stmt.setTimestamp(6, rs.getTimestamp('create_date')) def result = stmt.executeQuery() @@ -356,20 +370,24 @@ def processHistory(Integer initiativeAgreementId, List creditTradeHistory, Prepa } -def processInternalComments(Integer initiativeAgreementId, List internalComments, +def processInternalComments(Integer initiativeAgreementId, PreparedStatement sourceInternalCommentStmt, PreparedStatement internalCommentStmt, + PreparedStatement getUserNameStmt, PreparedStatement initiativeAgreementInternalCommentStmt) { - if (!internalComments) return - - internalComments.each { comment -> - if (!comment) return // Skip null comments - + // Fetch internal comments + sourceInternalCommentStmt.setInt(1, initiativeAgreementId) + ResultSet internalCommentResult = sourceInternalCommentStmt.executeQuery() + while (internalCommentResult.next()) { try { // Insert the internal comment - internalCommentStmt.setString(1, comment.credit_trade_comment ?: '') - internalCommentStmt.setString(2, getAudienceScope(comment.role_names ?: '')) - internalCommentStmt.setInt(3, comment.create_user_id ?: null) - internalCommentStmt.setTimestamp(4, toSqlTimestamp(comment.create_timestamp ?: '2013-01-01T00:00:00Z')) + internalCommentStmt.setString(1, internalCommentResult.getString('credit_trade_comment') ?: '') + internalCommentStmt.setString(2, getAudienceScope(internalCommentResult.getString('role_names') ?: '')) + internalCommentStmt.setString(3, + getUserName(getUserNameStmt, internalCommentResult.getInt('create_user_id') ?: null)) + internalCommentStmt.setTimestamp(4, + internalCommentResult.getTimestamp('create_timestamp') ?: null) + internalCommentStmt.setTimestamp(5, + internalCommentResult.getTimestamp('update_timestamp') ?: null) def internalCommentId = null def commentResult = internalCommentStmt.executeQuery() @@ -387,7 +405,8 @@ def processInternalComments(Integer initiativeAgreementId, List internalComments log.error("Error processing internal comment for initiative-agreement ${initiativeAgreementId}: ${e.getMessage()}", e) } } - } + internalCommentResult.close() +} // Helper function to determine audience scope based on role names def getAudienceScope(String roleNames) { @@ -404,7 +423,7 @@ def getAudienceScope(String roleNames) { } def insertInitiativeAgreement(ResultSet rs, PreparedStatement initiativeAgreementStmt, - Long toTransactionId, Map preparedData, Connection conn) { + Long toTransactionId, Map preparedData, Connection conn, PreparedStatement getUserNameStmt, PreparedStatement commentStmt) { // Check for duplicates in the `initiative_agreement` table def initiativeAgreementId = rs.getInt('initiative_agreement_id') def duplicateCheckStmt = conn.prepareStatement('SELECT COUNT(*) FROM initiative_agreement WHERE initiative_agreement_id = ?') @@ -419,14 +438,20 @@ def insertInitiativeAgreement(ResultSet rs, PreparedStatement initiativeAgreemen log.warn("Duplicate initiative_agreement detected with initiative_agreement_id: ${initiativeAgreementId}, skipping insertion.") return null } - + // Fetch comments + commentStmt.setInt(1, initiativeAgreementId) + ResultSet commentResult = commentStmt.executeQuery() + def govComment = null + if (commentResult.next()) { + govComment = commentResult.getString('gov_comment') + } // Proceed with insertion if no duplicate exists def statusId = getStatusId(rs.getString('current_status'), preparedData) initiativeAgreementStmt.setInt(1, rs.getInt('to_organization_id')) initiativeAgreementStmt.setObject(2, toTransactionId) initiativeAgreementStmt.setTimestamp(3, rs.getTimestamp('transaction_effective_date')) initiativeAgreementStmt.setInt(4, rs.getInt('compliance_units')) - initiativeAgreementStmt.setString(5, rs.getString('gov_comment')) + initiativeAgreementStmt.setString(5, govComment) initiativeAgreementStmt.setObject(6, statusId) initiativeAgreementStmt.setTimestamp(7, rs.getTimestamp('create_date')) initiativeAgreementStmt.setTimestamp(8, rs.getTimestamp('update_date')) diff --git a/etl/nifi_scripts/transfer.groovy b/etl/nifi_scripts/transfer.groovy index b74f66cff..f5361a286 100644 --- a/etl/nifi_scripts/transfer.groovy +++ b/etl/nifi_scripts/transfer.groovy @@ -72,9 +72,7 @@ SELECT ct.number_of_credits, ctc.category, cts.status, - ctzr.description, - ctt.the_type, - internal_comment.role_names; + ctt.the_type; """ def COMMENT_QUERY = ''' @@ -189,7 +187,7 @@ try { recommendationValue = 'Record' // matches "Record" in the transfer_recommendation_enum } else if (creditTradeHistoryJson.any { it.transfer_status == 'Not Recommended' }) { recommendationValue = 'Refuse' // matches "Refuse" in the transfer_recommendation_enum - } + } // Only if transfer does not exist, proceed to create transactions and then insert the transfer. def (fromTransactionId, toTransactionId) = processTransactions(resultSet.getString('current_status'), @@ -200,7 +198,7 @@ try { fromTransactionId, toTransactionId, preparedData, destinationConn, recommendationValue, getUserNameStmt) if (transferId) { - processHistory(transferId, creditTradeHistoryJson, statements.historyStmt, preparedData, getUserNameStmt) + processHistory(transferId, creditTradeHistoryJson, statements.historyStmt, preparedData) processInternalComments(transferId, internalCommentStmt, statements.internalCommentStmt, getUserNameStmt, statements.transferInternalCommentStmt) } else { @@ -225,6 +223,7 @@ try { \$\$ LANGUAGE plpgsql; """) destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_aggregate') + destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count') destinationConn.commit() log.debug("Processed ${recordCount} records successfully.") @@ -428,7 +427,7 @@ def transferExists(Connection conn, int transferId) { return count > 0 } -def processHistory(Integer transferId, List creditTradeHistory, PreparedStatement historyStmt, Map preparedData, PreparedStatement getUserNameStmt) { +def processHistory(Integer transferId, List creditTradeHistory, PreparedStatement historyStmt, Map preparedData) { if (!creditTradeHistory) return // Sort the records by create_timestamp to preserve chronological order @@ -499,7 +498,7 @@ def processInternalComments(Integer transferId, PreparedStatement sourceInternal } } internalCommentResult.close() - } +} // Helper function to determine audience scope based on role names def getAudienceScope(String roleNames) { @@ -569,6 +568,6 @@ def insertTransfer(ResultSet rs, PreparedStatement transferStmt, PreparedStateme transferStmt.setInt(19, rs.getInt('transfer_id')) def result = transferStmt.executeQuery() return result.next() ? result.getInt('transfer_id') : null - } +} log.warn('**** COMPLETED TRANSFER ETL ****')