Skip to content

Commit

Permalink
Merge pull request #1723 from bcgov/fix/prashanth-transactions-commen…
Browse files Browse the repository at this point in the history
…ts-fix-1702

fix: Comments on Admin adjustment and Initiative agreements from government users not showing in TFRS to LCFS Portal PROD ETL #1702
  • Loading branch information
prv-proton authored Jan 17, 2025
2 parents 3af0a64 + 305e969 commit 30c4d17
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 201 deletions.
220 changes: 122 additions & 98 deletions etl/nifi_scripts/adminAdjTrxn.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,6 @@ import java.sql.Timestamp
log.warn("**** STARTED ADMIN ADJUSTMENT ETL ****")

def SOURCE_QUERY = """
WITH
internal_comment AS (
SELECT
ctc.id,
ctc.credit_trade_id,
ctc.credit_trade_comment,
ctc.create_user_id,
ctc.create_timestamp,
STRING_AGG (r."name", '; ') AS role_names
FROM
credit_trade_comment ctc
JOIN "user" u ON u.id = ctc.create_user_id
AND u.organization_id = 1
AND ctc.is_privileged_access = TRUE
JOIN user_role ur ON ur.user_id = u.id
JOIN "role" r ON ur.role_id = r.id
GROUP BY
ctc.id,
ctc.credit_trade_id,
ctc.credit_trade_comment,
ctc.create_user_id,
ctc.create_timestamp
ORDER BY
ctc.credit_trade_id,
ctc.create_timestamp
)
SELECT
ct.id AS admin_adjustment_id,
ct.respondent_id AS to_organization_id,
Expand All @@ -44,10 +18,6 @@ WITH
ct.update_user_id as update_user,
ct.update_timestamp as update_date,
ct.create_timestamp as create_date,
-- Aggregate comments from government with internal comment handling
STRING_AGG (DISTINCT gov_ctc.credit_trade_comment, '; ') AS gov_comment,
-- JSON aggregation for internal comments
json_agg (row_to_json (internal_comment)) AS internal_comments,
-- JSON aggregation for credit trade history
json_agg (
json_build_object (
Expand Down Expand Up @@ -77,41 +47,6 @@ WITH
JOIN credit_trade_type ctt ON ct.type_id = ctt.id
LEFT OUTER JOIN credit_trade_category ctc ON ct.trade_category_id = ctc.id
JOIN credit_trade_status cts ON ct.status_id = cts.id
LEFT JOIN credit_trade_zero_reason ctzr ON ctzr.id = ct.zero_reason_id
AND ctzr.reason = 'Internal'
-- Join for Initiator Comments
LEFT JOIN credit_trade_comment from_ctc ON from_ctc.credit_trade_id = ct.id
AND from_ctc.create_user_id IN (
SELECT
u.id
FROM
"user" u
WHERE
u.organization_id = ct.initiator_id
)
-- Join for Respondent Comments
LEFT JOIN credit_trade_comment to_ctc ON to_ctc.credit_trade_id = ct.id
AND to_ctc.create_user_id IN (
SELECT
u.id
FROM
"user" u
WHERE
u.organization_id = ct.respondent_id
)
-- Join for Government Comments
LEFT JOIN credit_trade_comment gov_ctc ON gov_ctc.credit_trade_id = ct.id
AND gov_ctc.create_user_id IN (
SELECT
u.id
FROM
"user" u
WHERE
u.organization_id = 1
AND gov_ctc.is_privileged_access = FALSE
)
-- Join the internal comment logic for role-based filtering and audience_scope
LEFT JOIN internal_comment ON internal_comment.credit_trade_id = ct.id
-- Join for credit trade history
LEFT JOIN credit_trade_history cth ON cth.credit_trade_id = ct.id
JOIN credit_trade_status cts_history ON cth.status_id = cts_history.id
Expand All @@ -123,10 +58,49 @@ WITH
ct.date_of_written_agreement,
ct.trade_effective_date,
ct.number_of_credits,
cts.status,
ctzr.description,
internal_comment.role_names;
"""
cts.status;
"""
def COMMENT_QUERY = '''
SELECT
ct.id AS credit_trade_id,
MAX(CASE
WHEN u.organization_id = 1 AND ctc.is_privileged_access = FALSE THEN ctc.credit_trade_comment
END) AS gov_comment
FROM
credit_trade ct
LEFT JOIN credit_trade_comment ctc ON ctc.credit_trade_id = ct.id
LEFT JOIN "user" u ON ctc.create_user_id = u.id
WHERE
ct.id = ?
GROUP BY
ct.id;
'''

def INTERNAL_COMMENT_QUERY = """
SELECT
ctc.id,
ctc.credit_trade_id,
ctc.credit_trade_comment,
ctc.create_user_id,
ctc.create_timestamp,
ctc.update_timestamp,
STRING_AGG (r."name", '; ') AS role_names
FROM
credit_trade_comment ctc
JOIN "user" u ON u.id = ctc.create_user_id
AND u.organization_id = 1
AND ctc.is_privileged_access = TRUE
JOIN user_role ur ON ur.user_id = u.id
JOIN "role" r ON ur.role_id = r.id
WHERE ctc.credit_trade_id = ?
GROUP BY
ctc.id, ctc.credit_trade_id, ctc.credit_trade_comment,
ctc.create_user_id, ctc.create_timestamp
ORDER BY
ctc.credit_trade_id, ctc.create_timestamp;
"""

def USER_ID_QUERY = 'select keycloak_username from user_profile up where user_profile_id = ? limit 1'

// Fetch connections to both the source and destination databases
def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb')
Expand All @@ -146,6 +120,7 @@ try {
def statements = prepareStatements(destinationConn)

destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;')
destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_transaction_count() CASCADE;')
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_transaction_aggregate()
RETURNS void AS \$\$
Expand All @@ -154,31 +129,39 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_transaction_count()
RETURNS void AS \$\$
BEGIN
-- Temporarily disable the materialized view refresh
END;
\$\$ LANGUAGE plpgsql;
""")

PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY)
PreparedStatement commentStmt = sourceConn.prepareStatement(COMMENT_QUERY)
PreparedStatement internalCommentStmt = sourceConn.prepareStatement(INTERNAL_COMMENT_QUERY)
PreparedStatement getUserNameStmt = destinationConn.prepareStatement(USER_ID_QUERY)
ResultSet resultSet = sourceStmt.executeQuery()

int recordCount = 0

while (resultSet.next()) {
recordCount++
def jsonSlurper = new JsonSlurper()
def internalComments = resultSet.getString('internal_comments')
def creditTradeHistory = resultSet.getString('credit_trade_history')

def internalCommentsJson = internalComments ? jsonSlurper.parseText(internalComments) : []
def creditTradeHistoryJson = creditTradeHistory ? jsonSlurper.parseText(creditTradeHistory) : []

def toTransactionId = processTransactions(resultSet.getString('current_status'),
resultSet, statements.transactionStmt)
resultSet, statements.transactionStmt, getUserNameStmt)

def adminAdjustmentId = insertadminAdjustment(resultSet, statements.adminAdjustmentStmt,
toTransactionId, preparedData, destinationConn)
toTransactionId, preparedData, destinationConn, getUserNameStmt, commentStmt)

if (adminAdjustmentId) {
processHistory(adminAdjustmentId, creditTradeHistoryJson, statements.historyStmt, preparedData)
processInternalComments(adminAdjustmentId, internalCommentsJson, statements.internalCommentStmt,
statements.adminAdjustmentInternalCommentStmt)
processInternalComments(adminAdjustmentId, internalCommentStmt, statements.internalCommentStmt,
getUserNameStmt, statements.adminAdjustmentInternalCommentStmt)
} else {
log.warn("admin-adjustment not inserted for record: ${resultSet.getInt('admin_adjustment_id')}")
}
Expand All @@ -192,7 +175,16 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_transaction_count()
RETURNS void AS \$\$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count;
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_aggregate')
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count')

destinationConn.commit()
log.debug("Processed ${recordCount} records successfully.")
Expand Down Expand Up @@ -268,8 +260,8 @@ def prepareStatements(Connection conn) {
'''
def INSERT_INTERNAL_COMMENT_SQL = '''
INSERT INTO internal_comment (
internal_comment_id, comment, audience_scope, create_user, create_date
) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?)
internal_comment_id, comment, audience_scope, create_user, create_date, update_date
) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?, ?)
RETURNING internal_comment_id
'''
def INSERT_admin_adjustment_INTERNAL_COMMENT_SQL = '''
Expand Down Expand Up @@ -302,22 +294,44 @@ def toSqlTimestamp(String timestampString) {
}
}

def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt) {
def getUserName(PreparedStatement stmt, int userId) {
ResultSet rs = null
String userName = null

try {
stmt.setInt(1, userId)
rs = stmt.executeQuery()

if (rs.next()) {
userName = rs.getString('keycloak_username')
} else {
log.warn("No username found for user_id: ${userId}")
}
} catch (Exception e) {
log.error("Error while fetching username for user_id: ${userId}", e)
} finally {
if (rs != null) rs.close()
}

return userName
}

def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt, PreparedStatement getUserNameStmt) {
def toTransactionId = null

if (currentStatus == 'Approved') {
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'))
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'), getUserNameStmt)
}

return toTransactionId
}

def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId) {
def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId, PreparedStatement getUserNameStmt) {
stmt.setInt(1, rs.getInt('compliance_units'))
stmt.setInt(2, orgId)
stmt.setString(3, action)
stmt.setDate(4, rs.getDate('transaction_effective_date') ?: rs.getDate('agreement_date'))
stmt.setInt(5, rs.getInt('create_user'))
stmt.setString(5, getUserName(getUserNameStmt, rs.getInt('create_user')))
stmt.setTimestamp(6, rs.getTimestamp('create_date'))

def result = stmt.executeQuery()
Expand Down Expand Up @@ -355,21 +369,24 @@ def processHistory(Integer adminAdjustmentId, List creditTradeHistory, PreparedS
historyStmt.executeBatch()
}


def processInternalComments(Integer adminAdjustmentId, List internalComments,
def processInternalComments(Integer adminAdjustmentId, PreparedStatement sourceInternalCommentStmt,
PreparedStatement internalCommentStmt,
PreparedStatement getUserNameStmt,
PreparedStatement adminAdjustmentInternalCommentStmt) {
if (!internalComments) return

internalComments.each { comment ->
if (!comment) return // Skip null comments

// Fetch internal comments
sourceInternalCommentStmt.setInt(1, adminAdjustmentId)
ResultSet internalCommentResult = sourceInternalCommentStmt.executeQuery()
while (internalCommentResult.next()) {
try {
// Insert the internal comment
internalCommentStmt.setString(1, comment.credit_trade_comment ?: '')
internalCommentStmt.setString(2, getAudienceScope(comment.role_names ?: ''))
internalCommentStmt.setInt(3, comment.create_user_id ?: null)
internalCommentStmt.setTimestamp(4, toSqlTimestamp(comment.create_timestamp ?: '2013-01-01T00:00:00Z'))
internalCommentStmt.setString(1, internalCommentResult.getString('credit_trade_comment') ?: '')
internalCommentStmt.setString(2, getAudienceScope(internalCommentResult.getString('role_names') ?: ''))
internalCommentStmt.setString(3,
getUserName(getUserNameStmt, internalCommentResult.getInt('create_user_id') ?: null))
internalCommentStmt.setTimestamp(4,
internalCommentResult.getTimestamp('create_timestamp') ?: null)
internalCommentStmt.setTimestamp(5,
internalCommentResult.getTimestamp('update_timestamp') ?: null)

def internalCommentId = null
def commentResult = internalCommentStmt.executeQuery()
Expand All @@ -380,14 +397,15 @@ def processInternalComments(Integer adminAdjustmentId, List internalComments,
adminAdjustmentInternalCommentStmt.setInt(1, adminAdjustmentId)
adminAdjustmentInternalCommentStmt.setInt(2, internalCommentId)
adminAdjustmentInternalCommentStmt.executeUpdate()
}
}

commentResult.close()
} catch (Exception e) {
log.error("Error processing internal comment for admin-adjustment ${adminAdjustmentId}: ${e.getMessage()}", e)
}
}
}
internalCommentResult.close()
}

// Helper function to determine audience scope based on role names
def getAudienceScope(String roleNames) {
Expand All @@ -404,7 +422,7 @@ def getAudienceScope(String roleNames) {
}

def insertadminAdjustment(ResultSet rs, PreparedStatement adminAdjustmentStmt,
Long toTransactionId, Map preparedData, Connection conn) {
Long toTransactionId, Map preparedData, Connection conn, PreparedStatement getUserNameStmt, PreparedStatement commentStmt) {
// Check for duplicates in the `admin_adjustment` table
def adminAdjustmentId = rs.getInt('admin_adjustment_id')
def duplicateCheckStmt = conn.prepareStatement('SELECT COUNT(*) FROM admin_adjustment WHERE admin_adjustment_id = ?')
Expand All @@ -419,14 +437,20 @@ def insertadminAdjustment(ResultSet rs, PreparedStatement adminAdjustmentStmt,
log.warn("Duplicate admin_adjustment detected with admin_adjustment_id: ${adminAdjustmentId}, skipping insertion.")
return null
}

// Fetch comments
commentStmt.setInt(1, adminAdjustmentId)
ResultSet commentResult = commentStmt.executeQuery()
def govComment = null
if (commentResult.next()) {
govComment = commentResult.getString('gov_comment')
}
// Proceed with insertion if no duplicate exists
def statusId = getStatusId(rs.getString('current_status'), preparedData)
adminAdjustmentStmt.setInt(1, rs.getInt('to_organization_id'))
adminAdjustmentStmt.setObject(2, toTransactionId)
adminAdjustmentStmt.setTimestamp(3, rs.getTimestamp('transaction_effective_date'))
adminAdjustmentStmt.setInt(4, rs.getInt('compliance_units'))
adminAdjustmentStmt.setString(5, rs.getString('gov_comment'))
adminAdjustmentStmt.setString(5, govComment)
adminAdjustmentStmt.setObject(6, statusId)
adminAdjustmentStmt.setTimestamp(7, rs.getTimestamp('create_date'))
adminAdjustmentStmt.setTimestamp(8, rs.getTimestamp('update_date'))
Expand Down
Loading

0 comments on commit 30c4d17

Please sign in to comment.