diff --git a/backend/lcfs/db/migrations/versions/2025-01-15-22-48_5bc0ef48739a.py b/backend/lcfs/db/migrations/versions/2025-01-15-22-48_5bc0ef48739a.py new file mode 100644 index 000000000..5046cf1f2 --- /dev/null +++ b/backend/lcfs/db/migrations/versions/2025-01-15-22-48_5bc0ef48739a.py @@ -0,0 +1,44 @@ +"""add truck and marine transport mode + +Revision ID: 5bc0ef48739a +Revises: f78e53370ed2 +Create Date: 2025-01-15 22:48:43.582069 + +""" + +import sqlalchemy as sa +from alembic import op +from datetime import datetime + +# revision identifiers, used by Alembic. +revision = "5bc0ef48739a" +down_revision = "8119d12538df" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + current_time = datetime.now() + + # Insert Truck and Marine transport modes + op.execute( + """ + INSERT INTO transport_mode (transport_mode, create_date, update_date, create_user, update_user) + VALUES + ('Truck', '{}', '{}', 'no_user', 'no_user'), + ('Marine', '{}', '{}', 'no_user', 'no_user') + + """.format( + current_time, current_time, current_time, current_time + ) + ) + + +def downgrade() -> None: + # Remove Truck and Marine transport modes + op.execute( + """ + DELETE FROM transport_mode + WHERE transport_mode IN ('Truck', 'Marine') + """ + ) diff --git a/etl/database/nifi-registry-primary.mv.db b/etl/database/nifi-registry-primary.mv.db index f5b1bf947..700e83338 100644 Binary files a/etl/database/nifi-registry-primary.mv.db and b/etl/database/nifi-registry-primary.mv.db differ diff --git a/etl/nifi/conf/flow.json.gz b/etl/nifi/conf/flow.json.gz index e2e4eaaef..b9c271f27 100644 Binary files a/etl/nifi/conf/flow.json.gz and b/etl/nifi/conf/flow.json.gz differ diff --git a/etl/nifi/conf/flow.xml.gz b/etl/nifi/conf/flow.xml.gz index 203856fcb..13fa45db2 100644 Binary files a/etl/nifi/conf/flow.xml.gz and b/etl/nifi/conf/flow.xml.gz differ diff --git a/etl/nifi_scripts/fuel_code.groovy b/etl/nifi_scripts/fuel_code.groovy index 57de4f16f..11aa7017f 100644 --- a/etl/nifi_scripts/fuel_code.groovy +++ b/etl/nifi_scripts/fuel_code.groovy @@ -1,80 +1,321 @@ -import org.apache.nifi.processor.io.StreamCallback +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet import groovy.json.JsonSlurper -import groovy.json.JsonOutput - -def transformCallback = { inputStream, outputStream -> - try { - // Parse JSON input - def record = new JsonSlurper().parseText(inputStream.text) - - // Map the fuel_id to the corresponding new value - def fuelIdMapping = [ - 8 : 13, // Propane - 21 : 15, // Renewable naphtha - 10 : 14, // Renewable gasoline - 19 : 16, // Fossil-derived diesel - 11 : 17, // Fossil-derived gasoline - 20 : 17 // Fossil-derived gasoline - ] - - // Replace fuel_id if it matches one of the keys in the map - if (fuelIdMapping.containsKey(record.fuel_id)) { - record.fuel_id = fuelIdMapping[record.fuel_id] + +log.warn('**** STARTING FUEL CODE ETL ****') + +def fuelIdMapping = [ + 8 : 13, + 9 : 5, // TODO: need to double check this + 10 : 14, + 11 : 17, + 19 : 16, + 20 : 17, + 21 : 15, +] + +def transportModeMapping = [ + 1 : 6, + 3 : 7 +] + +def provinceStateMap = [ + // Canadian Provinces + 'BC': 'British Columbia', + 'AB': 'Alberta', + 'SK': 'Saskatchewan', + 'MB': 'Manitoba', + 'ON': 'Ontario', + 'QC': 'Quebec', + 'NL': 'Newfoundland and Labrador', + 'BRITISH COLUMBIA': 'British Columbia', + 'ALBERTA': 'Alberta', + 'SASKATCHEWAN': 'Saskatchewan', + 'MANITOBA': 'Manitoba', + 'ONTARIO': 'Ontario', + 'QUEBEC': 'Quebec', + 'NEWFOUNDLAND': 'Newfoundland and Labrador', + + // US States + 'CA': 'California', + 'CT': 'Connecticut', + 'GA': 'Georgia', + 'IA': 'Iowa', + 'IL': 'Illinois', + 'IN': 'Indiana', + 'KS': 'Kansas', + 'LA': 'Louisiana', + 'MN': 'Minnesota', + 'MS': 'Mississippi', + 'MO': 'Missouri', + 'MT': 'Montana', + 'ND': 'North Dakota', + 'NE': 'Nebraska', + 'NM': 'New Mexico', + 'OH': 'Ohio', + 'OK': 'Oklahoma', + 'OR': 'Oregon', + 'SD': 'South Dakota', + 'TX': 'Texas', + 'WA': 'Washington', + 'WI': 'Wisconsin', + 'WY': 'Wyoming', + 'CALIFORNIA': 'California', + 'GEORGIA': 'Georgia', + 'IOWA': 'Iowa', + 'ILLINOIS': 'Illinois', + 'INDIANA': 'Indiana', + 'KANSAS': 'Kansas', + 'LOUISIANA': 'Louisiana', + 'MINNESOTA': 'Minnesota', + 'MISSISSIPPI': 'Mississippi', + 'MISSOURI': 'Missouri', + 'MONTANA': 'Montana', + 'NORTH DAKOTA': 'North Dakota', + 'NEBRASKA': 'Nebraska', + 'NEW MEXICO': 'New Mexico', + 'OHIO': 'Ohio', + 'OKLAHOMA': 'Oklahoma', + 'OREGON': 'Oregon', + 'SOUTH DAKOTA': 'South Dakota', + 'TEXAS': 'Texas', + 'WASHINGTON': 'Washington', + 'WISCONSIN': 'Wisconsin', + 'WYOMING': 'Wyoming', + 'WYOMING.': 'Wyoming', // Add variant with period + 'CONNETICUT': 'Connecticut', // Add misspelled variant + 'CONNECTICUT': 'Connecticut', +] + +def fuelCodeQuery = ''' + SELECT + fc.*, + array_agg(DISTINCT finishedFtm.transport_mode_id) AS finished_fuel_transport_modes, + array_agg(DISTINCT feedstockFtf.transport_mode_id) AS feedstock_fuel_transport_modes + FROM + fuel_code fc + LEFT JOIN + fuel_transport_mode_fuel_code finishedFtm ON fc.id = finishedFtm.fuel_code_id + LEFT JOIN + feedstock_transport_mode_fuel_code feedstockFtf ON fc.id = feedstockFtf.fuel_code_id + GROUP BY + fc.id; +''' + +// Insert `fuel_code` into the target database +def insertFuelCodeSQL = ''' + INSERT INTO fuel_code ( + fuel_status_id, + fuel_suffix, + company, + carbon_intensity, + last_updated, + application_date, + approval_date, + fuel_type_id, + feedstock, + feedstock_location, + feedstock_misc, + facility_nameplate_capacity, + former_company, + create_date, + update_date, + effective_date, + expiration_date, + fuel_production_facility_city, + fuel_production_facility_province_state, + fuel_production_facility_country, + prefix_id, + edrms, + notes, + create_user, + update_user, + effective_status, + contact_name, + contact_email + ) + VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + 1, + '', + null, + null, + null, + true, + null, + null + ) + RETURNING fuel_code_id; +''' + +// Insert into `finished_fuel_transport_mode` +def insertFinishedFuelTransportModeSQL = ''' + INSERT INTO finished_fuel_transport_mode (fuel_code_id, transport_mode_id) + VALUES (?, ?); +''' + +// Insert into `feedstock_fuel_transport_mode` +def insertFeedstockFuelTransportModeSQL = ''' + INSERT INTO feedstock_fuel_transport_mode (fuel_code_id, transport_mode_id) + VALUES (?, ?); +''' + +// Fetch connections to both source and destination databases +def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') +def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') + +sourceConn = null +destinationConn = null + +try { + // Get connections + sourceConn = sourceDbcpService.getConnection() + destinationConn = destinationDbcpService.getConnection() + + fetchFuelCodes = sourceConn.prepareStatement(fuelCodeQuery) + fuelCodes = fetchFuelCodes.executeQuery() + + while (fuelCodes.next()) { + def fuelCodeVersion = fuelCodes.getString('fuel_code_version') + def fuelCodeVersionMinor = fuelCodes.getString('fuel_code_version_minor') + + def fuelStatusId = fuelCodes.getInt('status_id') + def fuelSuffix = "${fuelCodeVersion}.${fuelCodeVersionMinor}" + def company = fuelCodes.getString('company') + def carbonIntensity = fuelCodes.getDouble('carbon_intensity') + def lastUpdated = fuelCodes.getDate('update_timestamp') + def applicationDate = fuelCodes.getDate('application_date') + def approvalDate = fuelCodes.getDate('approval_date') + def fuelTypeId = fuelCodes.getInt('fuel_id') + def feedstock = fuelCodes.getString('feedstock') + def feedstockLocation = fuelCodes.getString('feedstock_location') + def feedstockMisc = fuelCodes.getString('feedstock_misc') + def facilityNameplateCapacity = fuelCodes.getDouble('facility_nameplate') + def formerCompany = fuelCodes.getString('former_company') + def createDate = fuelCodes.getDate('create_timestamp') + def updateDate = fuelCodes.getDate('update_timestamp') + def effectiveDate = fuelCodes.getDate('effective_date') + def expirationDate = fuelCodes.getDate('expiry_date') + + def facilityLocation = fuelCodes.getString('facility_location') + + def locationParts = (facilityLocation ?: '').split(',').collect { it.trim() } + def facilityCity = null + def facilityProvinceState = null + def facilityCountry = null + + if (locationParts.size() == 1) { + def location = locationParts[0].toUpperCase() + if (location == 'US CENTRAL') { + facilityCountry = 'United States of America' + } else if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } + } else if (locationParts.size() == 2) { + // First part is always city + facilityCity = locationParts[0] + + // Process second part - could be province/state or country + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + // If not a recognized province/state, treat as country + facilityCountry = locationParts[1] + } + } else if (locationParts.size() == 3) { + // First part is always city + facilityCity = locationParts[0] + + // Second part is province/state + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + facilityProvinceState = locationParts[1] + } + + // Third part is always country - expand USA to full name + def country = locationParts[2].toUpperCase() + if (country == 'USA') { + facilityCountry = 'United States of America' + } else { + facilityCountry = locationParts[2] + } } - // Map the fields from the source to the target schema - // The following fields are not used in the migration: fuel_code_id, facility_location, renewable_percentage, facility_nameplate_capacity_unit - def transformedRecord = [ - fuel_status_id : record.status_id, - prefix_id : 1, // BCLCF - fuel_suffix : "${record.fuel_code_version}.${record.fuel_code_version_minor}", - company : record.company, - carbon_intensity : record.carbon_intensity, - edrms : "", - last_updated : record.update_timestamp, - application_date : record.application_date, - approval_date : record.approval_date, - fuel_type_id : record.fuel_id, - feedstock : record.feedstock, - feedstock_location : record.feedstock_location, - feedstock_misc : record.feedstock_misc, - facility_nameplate_capacity : record.facility_nameplate, - former_company : record.former_company, - notes : null, - create_date : record.create_timestamp, - update_date : record.update_timestamp, - create_user : null, - update_user : null, - effective_date : record.effective_date, - expiration_date : record.expiry_date, - effective_status : true, - fuel_production_facility_city : null, - fuel_production_facility_province_state: null, - fuel_production_facility_country : null, - contact_name : null, - contact_email : null - ] - - // Write the transformed data back to the output - outputStream.write(JsonOutput.toJson(transformedRecord).getBytes("UTF-8")) - - } catch (Exception e) { - def recordId = record?.id - if (recordId) { - flowFile = session.putAttribute(flowFile, "failed_record_id", recordId.toString()) + def finishedModes = fuelCodes.getArray('finished_fuel_transport_modes')?.getArray() ?: [] + def feedstockModes = fuelCodes.getArray('feedstock_fuel_transport_modes')?.getArray() ?: [] + + // Insert `fuel_code` and get the new ID + def insertFuelCode = destinationConn.prepareStatement(insertFuelCodeSQL) + insertFuelCode.setInt(1, fuelStatusId) + insertFuelCode.setString(2, fuelSuffix) + insertFuelCode.setString(3, company) + insertFuelCode.setDouble(4, carbonIntensity) + insertFuelCode.setDate(5, lastUpdated) + insertFuelCode.setDate(6, applicationDate) + insertFuelCode.setDate(7, approvalDate) + insertFuelCode.setInt(8, fuelIdMapping[fuelTypeId] ?: fuelTypeId) + insertFuelCode.setString(9, feedstock) + insertFuelCode.setString(10, feedstockLocation) + insertFuelCode.setString(11, feedstockMisc) + insertFuelCode.setDouble(12, facilityNameplateCapacity) + insertFuelCode.setString(13, formerCompany) + insertFuelCode.setDate(14, createDate) + insertFuelCode.setDate(15, updateDate) + insertFuelCode.setDate(16, effectiveDate) + insertFuelCode.setDate(17, expirationDate) + insertFuelCode.setString(18, facilityCity) + insertFuelCode.setString(19, facilityProvinceState) + insertFuelCode.setString(20, facilityCountry) + + def rs = insertFuelCode.executeQuery() + def newFuelCodeId = rs.next() ? rs.getInt('fuel_code_id') : null + rs.close() + + // Insert finished fuel transport modes + finishedModes.each { mode -> + def insertFinishedMode = destinationConn.prepareStatement(insertFinishedFuelTransportModeSQL) + insertFinishedMode.setInt(1, newFuelCodeId) + insertFinishedMode.setInt(2, transportModeMapping[mode] ?: mode) + insertFinishedMode.executeUpdate() } - throw e - } -} -// Obtain the flowFile from the session -flowFile = session.get() -if (flowFile != null) { - try { - // Write the transformed data using the transformCallback - flowFile = session.write(flowFile, transformCallback as StreamCallback) - session.transfer(flowFile, REL_SUCCESS) - } catch (Exception e) { - session.transfer(flowFile, REL_FAILURE) + // Insert feedstock fuel transport modes + feedstockModes.each { mode -> + def insertFeedstockMode = destinationConn.prepareStatement(insertFeedstockFuelTransportModeSQL) + insertFeedstockMode.setInt(1, newFuelCodeId) + insertFeedstockMode.setInt(2, transportModeMapping[mode] ?: mode) + insertFeedstockMode.executeUpdate() + } } +} catch (Exception e) { + log.error('Error occurred during ETL process', e) +} finally { + if (fetchFuelCodes) fetchFuelCodes.close() + if (sourceConn) sourceConn.close() + if (destinationConn) destinationConn.close() } + +log.warn('**** COMPLETED FUEL CODE ETL ****') diff --git a/etl/nifi_scripts/fuel_code.old.groovy b/etl/nifi_scripts/fuel_code.old.groovy new file mode 100644 index 000000000..a0fef4616 --- /dev/null +++ b/etl/nifi_scripts/fuel_code.old.groovy @@ -0,0 +1,194 @@ +import org.apache.nifi.processor.io.StreamCallback +import groovy.json.JsonSlurper +import groovy.json.JsonOutput + +def transformCallback = { inputStream, outputStream -> + try { + // Parse JSON input + def record = new JsonSlurper().parseText(inputStream.text) + + // Map the fuel_id to the corresponding new value + def fuelIdMapping = [ + 8 : 13, // Propane + 21 : 15, // Renewable naphtha + 10 : 14, // Renewable gasoline + 19 : 16, // Fossil-derived diesel + 11 : 17, // Fossil-derived gasoline + 20 : 17 // Fossil-derived gasoline + ] + + // Replace fuel_id if it matches one of the keys in the map + if (fuelIdMapping.containsKey(record.fuel_id)) { + record.fuel_id = fuelIdMapping[record.fuel_id] + } + + // Parse facility location + def locationParts = (record.facility_location ?: '').split(',').collect { it.trim() } + def facilityCity = null + def facilityProvinceState = null + def facilityCountry = null + + // Map of provinces and states to their full names (only those appearing in the data) + def provinceStateMap = [ + // Canadian Provinces + 'BC': 'British Columbia', + 'AB': 'Alberta', + 'SK': 'Saskatchewan', + 'MB': 'Manitoba', + 'ON': 'Ontario', + 'QC': 'Quebec', + 'NL': 'Newfoundland and Labrador', + 'BRITISH COLUMBIA': 'British Columbia', + 'ALBERTA': 'Alberta', + 'SASKATCHEWAN': 'Saskatchewan', + 'MANITOBA': 'Manitoba', + 'ONTARIO': 'Ontario', + 'QUEBEC': 'Quebec', + 'NEWFOUNDLAND': 'Newfoundland and Labrador', + + // US States + 'CA': 'California', + 'CT': 'Connecticut', + 'CONNETICUT': 'Connecticut', // Add misspelled variant + 'CONNECTICUT': 'Connecticut', + 'GA': 'Georgia', + 'IA': 'Iowa', + 'IL': 'Illinois', + 'IN': 'Indiana', + 'KS': 'Kansas', + 'LA': 'Louisiana', + 'MN': 'Minnesota', + 'MS': 'Mississippi', + 'MO': 'Missouri', + 'MT': 'Montana', + 'ND': 'North Dakota', + 'NE': 'Nebraska', + 'NM': 'New Mexico', + 'OH': 'Ohio', + 'OK': 'Oklahoma', + 'OR': 'Oregon', + 'SD': 'South Dakota', + 'TX': 'Texas', + 'WA': 'Washington', + 'WI': 'Wisconsin', + 'WY': 'Wyoming', + 'CALIFORNIA': 'California', + 'GEORGIA': 'Georgia', + 'IOWA': 'Iowa', + 'ILLINOIS': 'Illinois', + 'INDIANA': 'Indiana', + 'KANSAS': 'Kansas', + 'LOUISIANA': 'Louisiana', + 'MINNESOTA': 'Minnesota', + 'MISSISSIPPI': 'Mississippi', + 'MISSOURI': 'Missouri', + 'MONTANA': 'Montana', + 'NORTH DAKOTA': 'North Dakota', + 'NEBRASKA': 'Nebraska', + 'NEW MEXICO': 'New Mexico', + 'OHIO': 'Ohio', + 'OKLAHOMA': 'Oklahoma', + 'OREGON': 'Oregon', + 'SOUTH DAKOTA': 'South Dakota', + 'TEXAS': 'Texas', + 'WASHINGTON': 'Washington', + 'WISCONSIN': 'Wisconsin', + 'WYOMING': 'Wyoming', + 'WYOMING.': 'Wyoming', // Add variant with period + ] + + if (locationParts.size() == 1) { + def location = locationParts[0].toUpperCase() + if (location == 'US CENTRAL') { + facilityCountry = 'United States of America' + } else if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } + } else if (locationParts.size() == 2) { + // First part is always city + facilityCity = locationParts[0] + + // Process second part - could be province/state or country + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + // If not a recognized province/state, treat as country + facilityCountry = locationParts[1] + } + } else if (locationParts.size() == 3) { + // First part is always city + facilityCity = locationParts[0] + + // Second part is province/state + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + facilityProvinceState = locationParts[1] + } + + // Third part is always country - expand USA to full name + def country = locationParts[2].toUpperCase() + if (country == 'USA') { + facilityCountry = 'United States of America' + } else { + facilityCountry = locationParts[2] + } + } + + // Map the fields from the source to the target schema + // The following fields are not used in the migration: fuel_code_id, facility_location, renewable_percentage, facility_nameplate_capacity_unit + def transformedRecord = [ + fuel_status_id : record.status_id, + prefix_id : 1, // BCLCF + fuel_suffix : "${record.fuel_code_version}.${record.fuel_code_version_minor}", + company : record.company, + carbon_intensity : record.carbon_intensity, + edrms : '', + last_updated : record.update_timestamp, + application_date : record.application_date, + approval_date : record.approval_date, + fuel_type_id : record.fuel_id, + feedstock : record.feedstock, + feedstock_location : record.feedstock_location, + feedstock_misc : record.feedstock_misc, + facility_nameplate_capacity : record.facility_nameplate, + former_company : record.former_company, + notes : null, + create_date : record.create_timestamp, + update_date : record.update_timestamp, + create_user : null, + update_user : null, + effective_date : record.effective_date, + expiration_date : record.expiry_date, + effective_status : true, + fuel_production_facility_city : facilityCity, + fuel_production_facility_province_state: facilityProvinceState, + fuel_production_facility_country : facilityCountry, + contact_name : null, + contact_email : null + ] + + // Write the transformed data back to the output + outputStream.write(JsonOutput.toJson(transformedRecord).getBytes('UTF-8')) + } catch (Exception e) { + def recordId = record?.id + if (recordId) { + flowFile = session.putAttribute(flowFile, 'failed_record_id', recordId.toString()) + } + throw e + } +} + +// Obtain the flowFile from the session +flowFile = session.get() +if (flowFile != null) { + try { + // Write the transformed data using the transformCallback + flowFile = session.write(flowFile, transformCallback as StreamCallback) + session.transfer(flowFile, REL_SUCCESS) + } catch (Exception e) { + session.transfer(flowFile, REL_FAILURE) + } +}