Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Comments on Admin adjustment and Initiative agreements from government users not showing in TFRS to LCFS Portal PROD ETL #1702 #1723

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 122 additions & 98 deletions etl/nifi_scripts/adminAdjTrxn.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,6 @@ import java.sql.Timestamp
log.warn("**** STARTED ADMIN ADJUSTMENT ETL ****")

def SOURCE_QUERY = """
WITH
internal_comment AS (
SELECT
ctc.id,
ctc.credit_trade_id,
ctc.credit_trade_comment,
ctc.create_user_id,
ctc.create_timestamp,
STRING_AGG (r."name", '; ') AS role_names
FROM
credit_trade_comment ctc
JOIN "user" u ON u.id = ctc.create_user_id
AND u.organization_id = 1
AND ctc.is_privileged_access = TRUE
JOIN user_role ur ON ur.user_id = u.id
JOIN "role" r ON ur.role_id = r.id
GROUP BY
ctc.id,
ctc.credit_trade_id,
ctc.credit_trade_comment,
ctc.create_user_id,
ctc.create_timestamp
ORDER BY
ctc.credit_trade_id,
ctc.create_timestamp
)
SELECT
ct.id AS admin_adjustment_id,
ct.respondent_id AS to_organization_id,
Expand All @@ -44,10 +18,6 @@ WITH
ct.update_user_id as update_user,
ct.update_timestamp as update_date,
ct.create_timestamp as create_date,
-- Aggregate comments from government with internal comment handling
STRING_AGG (DISTINCT gov_ctc.credit_trade_comment, '; ') AS gov_comment,
-- JSON aggregation for internal comments
json_agg (row_to_json (internal_comment)) AS internal_comments,
-- JSON aggregation for credit trade history
json_agg (
json_build_object (
Expand Down Expand Up @@ -77,41 +47,6 @@ WITH
JOIN credit_trade_type ctt ON ct.type_id = ctt.id
LEFT OUTER JOIN credit_trade_category ctc ON ct.trade_category_id = ctc.id
JOIN credit_trade_status cts ON ct.status_id = cts.id
LEFT JOIN credit_trade_zero_reason ctzr ON ctzr.id = ct.zero_reason_id
AND ctzr.reason = 'Internal'
-- Join for Initiator Comments
LEFT JOIN credit_trade_comment from_ctc ON from_ctc.credit_trade_id = ct.id
AND from_ctc.create_user_id IN (
SELECT
u.id
FROM
"user" u
WHERE
u.organization_id = ct.initiator_id
)
-- Join for Respondent Comments
LEFT JOIN credit_trade_comment to_ctc ON to_ctc.credit_trade_id = ct.id
AND to_ctc.create_user_id IN (
SELECT
u.id
FROM
"user" u
WHERE
u.organization_id = ct.respondent_id
)
-- Join for Government Comments
LEFT JOIN credit_trade_comment gov_ctc ON gov_ctc.credit_trade_id = ct.id
AND gov_ctc.create_user_id IN (
SELECT
u.id
FROM
"user" u
WHERE
u.organization_id = 1
AND gov_ctc.is_privileged_access = FALSE
)
-- Join the internal comment logic for role-based filtering and audience_scope
LEFT JOIN internal_comment ON internal_comment.credit_trade_id = ct.id
-- Join for credit trade history
LEFT JOIN credit_trade_history cth ON cth.credit_trade_id = ct.id
JOIN credit_trade_status cts_history ON cth.status_id = cts_history.id
Expand All @@ -123,10 +58,49 @@ WITH
ct.date_of_written_agreement,
ct.trade_effective_date,
ct.number_of_credits,
cts.status,
ctzr.description,
internal_comment.role_names;
"""
cts.status;
"""
def COMMENT_QUERY = '''
SELECT
ct.id AS credit_trade_id,
MAX(CASE
WHEN u.organization_id = 1 AND ctc.is_privileged_access = FALSE THEN ctc.credit_trade_comment
END) AS gov_comment
FROM
credit_trade ct
LEFT JOIN credit_trade_comment ctc ON ctc.credit_trade_id = ct.id
LEFT JOIN "user" u ON ctc.create_user_id = u.id
WHERE
ct.id = ?
GROUP BY
ct.id;
'''

def INTERNAL_COMMENT_QUERY = """
SELECT
ctc.id,
ctc.credit_trade_id,
ctc.credit_trade_comment,
ctc.create_user_id,
ctc.create_timestamp,
ctc.update_timestamp,
STRING_AGG (r."name", '; ') AS role_names
FROM
credit_trade_comment ctc
JOIN "user" u ON u.id = ctc.create_user_id
AND u.organization_id = 1
AND ctc.is_privileged_access = TRUE
JOIN user_role ur ON ur.user_id = u.id
JOIN "role" r ON ur.role_id = r.id
WHERE ctc.credit_trade_id = ?
GROUP BY
ctc.id, ctc.credit_trade_id, ctc.credit_trade_comment,
ctc.create_user_id, ctc.create_timestamp
ORDER BY
ctc.credit_trade_id, ctc.create_timestamp;
"""

def USER_ID_QUERY = 'select keycloak_username from user_profile up where user_profile_id = ? limit 1'

// Fetch connections to both the source and destination databases
def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb')
Expand All @@ -146,6 +120,7 @@ try {
def statements = prepareStatements(destinationConn)

destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_transaction_aggregate() CASCADE;')
destinationConn.createStatement().execute('DROP FUNCTION IF EXISTS refresh_mv_transaction_count() CASCADE;')
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_transaction_aggregate()
RETURNS void AS \$\$
Expand All @@ -154,31 +129,39 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_transaction_count()
RETURNS void AS \$\$
BEGIN
-- Temporarily disable the materialized view refresh
END;
\$\$ LANGUAGE plpgsql;
""")

PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY)
PreparedStatement commentStmt = sourceConn.prepareStatement(COMMENT_QUERY)
PreparedStatement internalCommentStmt = sourceConn.prepareStatement(INTERNAL_COMMENT_QUERY)
PreparedStatement getUserNameStmt = destinationConn.prepareStatement(USER_ID_QUERY)
ResultSet resultSet = sourceStmt.executeQuery()

int recordCount = 0

while (resultSet.next()) {
recordCount++
def jsonSlurper = new JsonSlurper()
def internalComments = resultSet.getString('internal_comments')
def creditTradeHistory = resultSet.getString('credit_trade_history')

def internalCommentsJson = internalComments ? jsonSlurper.parseText(internalComments) : []
def creditTradeHistoryJson = creditTradeHistory ? jsonSlurper.parseText(creditTradeHistory) : []

def toTransactionId = processTransactions(resultSet.getString('current_status'),
resultSet, statements.transactionStmt)
resultSet, statements.transactionStmt, getUserNameStmt)

def adminAdjustmentId = insertadminAdjustment(resultSet, statements.adminAdjustmentStmt,
toTransactionId, preparedData, destinationConn)
toTransactionId, preparedData, destinationConn, getUserNameStmt, commentStmt)

if (adminAdjustmentId) {
processHistory(adminAdjustmentId, creditTradeHistoryJson, statements.historyStmt, preparedData)
processInternalComments(adminAdjustmentId, internalCommentsJson, statements.internalCommentStmt,
statements.adminAdjustmentInternalCommentStmt)
processInternalComments(adminAdjustmentId, internalCommentStmt, statements.internalCommentStmt,
getUserNameStmt, statements.adminAdjustmentInternalCommentStmt)
} else {
log.warn("admin-adjustment not inserted for record: ${resultSet.getInt('admin_adjustment_id')}")
}
Expand All @@ -192,7 +175,16 @@ try {
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute("""
CREATE OR REPLACE FUNCTION refresh_mv_transaction_count()
RETURNS void AS \$\$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count;
END;
\$\$ LANGUAGE plpgsql;
""")
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_aggregate')
destinationConn.createStatement().execute('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_transaction_count')

destinationConn.commit()
log.debug("Processed ${recordCount} records successfully.")
Expand Down Expand Up @@ -268,8 +260,8 @@ def prepareStatements(Connection conn) {
'''
def INSERT_INTERNAL_COMMENT_SQL = '''
INSERT INTO internal_comment (
internal_comment_id, comment, audience_scope, create_user, create_date
) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?)
internal_comment_id, comment, audience_scope, create_user, create_date, update_date
) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?, ?)
RETURNING internal_comment_id
'''
def INSERT_admin_adjustment_INTERNAL_COMMENT_SQL = '''
Expand Down Expand Up @@ -302,22 +294,44 @@ def toSqlTimestamp(String timestampString) {
}
}

def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt) {
def getUserName(PreparedStatement stmt, int userId) {
ResultSet rs = null
String userName = null

try {
stmt.setInt(1, userId)
rs = stmt.executeQuery()

if (rs.next()) {
userName = rs.getString('keycloak_username')
} else {
log.warn("No username found for user_id: ${userId}")
}
} catch (Exception e) {
log.error("Error while fetching username for user_id: ${userId}", e)
} finally {
if (rs != null) rs.close()
}

return userName
}

def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt, PreparedStatement getUserNameStmt) {
def toTransactionId = null

if (currentStatus == 'Approved') {
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'))
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'), getUserNameStmt)
}

return toTransactionId
}

def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId) {
def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId, PreparedStatement getUserNameStmt) {
stmt.setInt(1, rs.getInt('compliance_units'))
stmt.setInt(2, orgId)
stmt.setString(3, action)
stmt.setDate(4, rs.getDate('transaction_effective_date') ?: rs.getDate('agreement_date'))
stmt.setInt(5, rs.getInt('create_user'))
stmt.setString(5, getUserName(getUserNameStmt, rs.getInt('create_user')))
stmt.setTimestamp(6, rs.getTimestamp('create_date'))

def result = stmt.executeQuery()
Expand Down Expand Up @@ -355,21 +369,24 @@ def processHistory(Integer adminAdjustmentId, List creditTradeHistory, PreparedS
historyStmt.executeBatch()
}


def processInternalComments(Integer adminAdjustmentId, List internalComments,
def processInternalComments(Integer adminAdjustmentId, PreparedStatement sourceInternalCommentStmt,
PreparedStatement internalCommentStmt,
PreparedStatement getUserNameStmt,
PreparedStatement adminAdjustmentInternalCommentStmt) {
if (!internalComments) return

internalComments.each { comment ->
if (!comment) return // Skip null comments

// Fetch internal comments
sourceInternalCommentStmt.setInt(1, adminAdjustmentId)
ResultSet internalCommentResult = sourceInternalCommentStmt.executeQuery()
while (internalCommentResult.next()) {
try {
// Insert the internal comment
internalCommentStmt.setString(1, comment.credit_trade_comment ?: '')
internalCommentStmt.setString(2, getAudienceScope(comment.role_names ?: ''))
internalCommentStmt.setInt(3, comment.create_user_id ?: null)
internalCommentStmt.setTimestamp(4, toSqlTimestamp(comment.create_timestamp ?: '2013-01-01T00:00:00Z'))
internalCommentStmt.setString(1, internalCommentResult.getString('credit_trade_comment') ?: '')
internalCommentStmt.setString(2, getAudienceScope(internalCommentResult.getString('role_names') ?: ''))
internalCommentStmt.setString(3,
getUserName(getUserNameStmt, internalCommentResult.getInt('create_user_id') ?: null))
internalCommentStmt.setTimestamp(4,
internalCommentResult.getTimestamp('create_timestamp') ?: null)
internalCommentStmt.setTimestamp(5,
internalCommentResult.getTimestamp('update_timestamp') ?: null)

def internalCommentId = null
def commentResult = internalCommentStmt.executeQuery()
Expand All @@ -380,14 +397,15 @@ def processInternalComments(Integer adminAdjustmentId, List internalComments,
adminAdjustmentInternalCommentStmt.setInt(1, adminAdjustmentId)
adminAdjustmentInternalCommentStmt.setInt(2, internalCommentId)
adminAdjustmentInternalCommentStmt.executeUpdate()
}
}

commentResult.close()
} catch (Exception e) {
log.error("Error processing internal comment for admin-adjustment ${adminAdjustmentId}: ${e.getMessage()}", e)
}
}
}
internalCommentResult.close()
}

// Helper function to determine audience scope based on role names
def getAudienceScope(String roleNames) {
Expand All @@ -404,7 +422,7 @@ def getAudienceScope(String roleNames) {
}

def insertadminAdjustment(ResultSet rs, PreparedStatement adminAdjustmentStmt,
Long toTransactionId, Map preparedData, Connection conn) {
Long toTransactionId, Map preparedData, Connection conn, PreparedStatement getUserNameStmt, PreparedStatement commentStmt) {
// Check for duplicates in the `admin_adjustment` table
def adminAdjustmentId = rs.getInt('admin_adjustment_id')
def duplicateCheckStmt = conn.prepareStatement('SELECT COUNT(*) FROM admin_adjustment WHERE admin_adjustment_id = ?')
Expand All @@ -419,14 +437,20 @@ def insertadminAdjustment(ResultSet rs, PreparedStatement adminAdjustmentStmt,
log.warn("Duplicate admin_adjustment detected with admin_adjustment_id: ${adminAdjustmentId}, skipping insertion.")
return null
}

// Fetch comments
commentStmt.setInt(1, adminAdjustmentId)
ResultSet commentResult = commentStmt.executeQuery()
def govComment = null
if (commentResult.next()) {
govComment = commentResult.getString('gov_comment')
}
// Proceed with insertion if no duplicate exists
def statusId = getStatusId(rs.getString('current_status'), preparedData)
adminAdjustmentStmt.setInt(1, rs.getInt('to_organization_id'))
adminAdjustmentStmt.setObject(2, toTransactionId)
adminAdjustmentStmt.setTimestamp(3, rs.getTimestamp('transaction_effective_date'))
adminAdjustmentStmt.setInt(4, rs.getInt('compliance_units'))
adminAdjustmentStmt.setString(5, rs.getString('gov_comment'))
adminAdjustmentStmt.setString(5, govComment)
adminAdjustmentStmt.setObject(6, statusId)
adminAdjustmentStmt.setTimestamp(7, rs.getTimestamp('create_date'))
adminAdjustmentStmt.setTimestamp(8, rs.getTimestamp('update_date'))
Expand Down
Loading
Loading