diff --git a/backend/lcfs/db/migrations/env.py b/backend/lcfs/db/migrations/env.py index e28e674e0..9926b50eb 100644 --- a/backend/lcfs/db/migrations/env.py +++ b/backend/lcfs/db/migrations/env.py @@ -34,6 +34,7 @@ "mv_org_compliance_report_count", "transaction_status_view", "mv_compliance_report_count", + "mv_fuel_code_count", ] diff --git a/backend/lcfs/db/migrations/versions/2025-01-14-23-47_8119d12538df.py b/backend/lcfs/db/migrations/versions/2025-01-14-23-47_8119d12538df.py new file mode 100644 index 000000000..2a5a46a06 --- /dev/null +++ b/backend/lcfs/db/migrations/versions/2025-01-14-23-47_8119d12538df.py @@ -0,0 +1,75 @@ +"""mv for fuel code count + +Revision ID: 8119d12538df +Revises: d25e7c47659e +Create Date: 2025-01-14 23:47:28.504150 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "8119d12538df" +down_revision = "fe03799b4018" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.execute( + """ + CREATE MATERIALIZED VIEW mv_fuel_code_count AS + SELECT + CASE fuel_status_id + WHEN 1 THEN 'Draft' + END as status, + COUNT(*) as count + FROM fuel_code + WHERE fuel_status_id = 1 + GROUP BY fuel_status_id; + """ + ) + + op.execute( + """ + CREATE UNIQUE INDEX mv_fuel_code_count_idx + ON mv_fuel_code_count (status); + """ + ) + + op.execute( + """ + CREATE OR REPLACE FUNCTION refresh_mv_fuel_code_count() + RETURNS TRIGGER AS $$ + BEGIN + REFRESH MATERIALIZED VIEW CONCURRENTLY mv_fuel_code_count; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + """ + ) + + op.execute( + """ + CREATE TRIGGER refresh_mv_fuel_code_count_after_change + AFTER INSERT OR UPDATE OR DELETE ON fuel_code + FOR EACH STATEMENT EXECUTE FUNCTION refresh_mv_fuel_code_count(); + """ + ) + + # Refresh the materialized view to include existing fuel codes + op.execute( + "REFRESH MATERIALIZED VIEW CONCURRENTLY mv_fuel_code_count;" + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.execute( + "DROP TRIGGER IF EXISTS refresh_mv_fuel_code_count_after_change ON fuel_code;") + op.execute("DROP FUNCTION IF EXISTS refresh_mv_fuel_code_count();") + op.execute("DROP MATERIALIZED VIEW IF EXISTS mv_fuel_code_count;") + # ### end Alembic commands ### diff --git a/backend/lcfs/db/migrations/versions/2025-01-15-22-48_5bc0ef48739a.py b/backend/lcfs/db/migrations/versions/2025-01-15-22-48_5bc0ef48739a.py new file mode 100644 index 000000000..5046cf1f2 --- /dev/null +++ b/backend/lcfs/db/migrations/versions/2025-01-15-22-48_5bc0ef48739a.py @@ -0,0 +1,44 @@ +"""add truck and marine transport mode + +Revision ID: 5bc0ef48739a +Revises: f78e53370ed2 +Create Date: 2025-01-15 22:48:43.582069 + +""" + +import sqlalchemy as sa +from alembic import op +from datetime import datetime + +# revision identifiers, used by Alembic. +revision = "5bc0ef48739a" +down_revision = "8119d12538df" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + current_time = datetime.now() + + # Insert Truck and Marine transport modes + op.execute( + """ + INSERT INTO transport_mode (transport_mode, create_date, update_date, create_user, update_user) + VALUES + ('Truck', '{}', '{}', 'no_user', 'no_user'), + ('Marine', '{}', '{}', 'no_user', 'no_user') + + """.format( + current_time, current_time, current_time, current_time + ) + ) + + +def downgrade() -> None: + # Remove Truck and Marine transport modes + op.execute( + """ + DELETE FROM transport_mode + WHERE transport_mode IN ('Truck', 'Marine') + """ + ) diff --git a/backend/lcfs/db/migrations/versions/2025-01-16-19-35_998929392c8b.py b/backend/lcfs/db/migrations/versions/2025-01-16-19-35_998929392c8b.py new file mode 100644 index 000000000..2bb1c4cc3 --- /dev/null +++ b/backend/lcfs/db/migrations/versions/2025-01-16-19-35_998929392c8b.py @@ -0,0 +1,51 @@ +"""add marine end use + +Revision ID: 998929392c8b +Revises: 5bc0ef48739a +Create Date: 2025-01-07 19:35:00.064999 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "998929392c8b" +down_revision = "5bc0ef48739a" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + """ + INSERT INTO end_use_type (end_use_type_id, type, intended_use) + VALUES (25, 'Marine', TRUE) + ON CONFLICT (end_use_type_id) DO NOTHING; + """ + ) + # Energy Effectiveness Ratios + op.execute( + """ + INSERT INTO energy_effectiveness_ratio ( + eer_id, fuel_category_id, fuel_type_id, end_use_type_id, ratio, effective_status + ) + VALUES (44, 2, 3, 25, 2.5, TRUE) + ON CONFLICT (eer_id) DO NOTHING; + """ + ) + + +def downgrade() -> None: + op.execute( + """ + DELETE FROM energy_effectiveness_ratio + WHERE eer_id = 44; + """ + ) + op.execute( + """ + DELETE FROM end_use_type + WHERE end_use_type_id = 25; + """ + ) diff --git a/backend/lcfs/db/migrations/versions/2025-01-16-20-01_98d79870df6b.py b/backend/lcfs/db/migrations/versions/2025-01-16-20-01_98d79870df6b.py new file mode 100644 index 000000000..04f340feb --- /dev/null +++ b/backend/lcfs/db/migrations/versions/2025-01-16-20-01_98d79870df6b.py @@ -0,0 +1,41 @@ +"""Add TFRS Summary Columns + +Revision ID: 98d79870df6b +Revises: 998929392c8b +Create Date: 2025-01-16 20:01:34.038941 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "98d79870df6b" +down_revision = "998929392c8b" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "compliance_report_summary", + sa.Column("credits_offset_a", sa.Integer(), nullable=True), + ) + op.add_column( + "compliance_report_summary", + sa.Column("credits_offset_b", sa.Integer(), nullable=True), + ) + op.add_column( + "compliance_report_summary", + sa.Column("credits_offset_c", sa.Integer(), nullable=True), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("compliance_report_summary", "credits_offset_c") + op.drop_column("compliance_report_summary", "credits_offset_b") + op.drop_column("compliance_report_summary", "credits_offset_a") + # ### end Alembic commands ### diff --git a/backend/lcfs/db/models/compliance/ComplianceReportSummary.py b/backend/lcfs/db/models/compliance/ComplianceReportSummary.py index 96aca512e..8fd41e7d6 100644 --- a/backend/lcfs/db/models/compliance/ComplianceReportSummary.py +++ b/backend/lcfs/db/models/compliance/ComplianceReportSummary.py @@ -109,6 +109,11 @@ class ComplianceReportSummary(BaseModel, Auditable): line_21_non_compliance_penalty_payable = Column(Float, nullable=False, default=0) total_non_compliance_penalty_payable = Column(Float, nullable=False, default=0) + # Legacy TFRS Columns + credits_offset_a = Column(Integer) + credits_offset_b = Column(Integer) + credits_offset_c = Column(Integer) + compliance_report = relationship("ComplianceReport", back_populates="summary") def __repr__(self): diff --git a/backend/lcfs/db/models/fuel/FuelCodeCountView.py b/backend/lcfs/db/models/fuel/FuelCodeCountView.py new file mode 100644 index 000000000..711650769 --- /dev/null +++ b/backend/lcfs/db/models/fuel/FuelCodeCountView.py @@ -0,0 +1,20 @@ +from sqlalchemy import Column, Integer, String +from lcfs.db.base import BaseModel + + +class FuelCodeCountView(BaseModel): + __tablename__ = "mv_fuel_code_count" + __table_args__ = { + "extend_existing": True, + "comment": "Materialized view for counting fuel code by status", + } + + status = Column( + String, + primary_key=True, + comment="Status name (e.g. draft, approved, deleted)" + ) + count = Column( + Integer, + comment="Count of fuel code for this status" + ) diff --git a/backend/lcfs/db/seeders/dev/fuel_code_seeder.py b/backend/lcfs/db/seeders/dev/fuel_code_seeder.py index 059fda5ad..502de1139 100644 --- a/backend/lcfs/db/seeders/dev/fuel_code_seeder.py +++ b/backend/lcfs/db/seeders/dev/fuel_code_seeder.py @@ -34,9 +34,11 @@ def create_fuel_entry( effective_date, expiration_date, fuel_type_id, + fuel_status_id=2, ): return { **base_fuel_data, # Extend with the base fields + "fuel_status_id": fuel_status_id, "fuel_suffix": fuel_suffix, "company": company, "carbon_intensity": carbon_intensity, @@ -49,6 +51,7 @@ def create_fuel_entry( async def seed_fuel_codes(session): fuel_codes_to_seed = [ create_fuel_entry( + fuel_status_id=1, fuel_suffix="102.5", company="Neste Oil Singapore", carbon_intensity=37.21, @@ -57,6 +60,7 @@ async def seed_fuel_codes(session): fuel_type_id=5, ), create_fuel_entry( + fuel_status_id=1, fuel_suffix="124.4", company="Ag Processing Inc.", carbon_intensity=3.62, @@ -65,6 +69,7 @@ async def seed_fuel_codes(session): fuel_type_id=1, ), create_fuel_entry( + fuel_status_id=1, fuel_suffix="125.4", company="Archer Daniels Midland", carbon_intensity=-2.14, @@ -73,6 +78,7 @@ async def seed_fuel_codes(session): fuel_type_id=1, ), create_fuel_entry( + fuel_status_id=3, fuel_suffix="138.5", company="ADM Agri-Industries Company", carbon_intensity=4.26, @@ -81,6 +87,7 @@ async def seed_fuel_codes(session): fuel_type_id=1, ), create_fuel_entry( + fuel_status_id=3, fuel_suffix="143.4", company="Green Plains Otter Tail LLC", carbon_intensity=44.06, @@ -89,6 +96,7 @@ async def seed_fuel_codes(session): fuel_type_id=4, ), create_fuel_entry( + fuel_status_id=3, fuel_suffix="251.2", company="Incobrasa Industries, Ltd.", carbon_intensity=0.35, diff --git a/backend/lcfs/tests/fuel_export/conftest.py b/backend/lcfs/tests/fuel_export/conftest.py index fdb99e06d..e1225e581 100644 --- a/backend/lcfs/tests/fuel_export/conftest.py +++ b/backend/lcfs/tests/fuel_export/conftest.py @@ -1,5 +1,7 @@ +from datetime import datetime import pytest from unittest.mock import AsyncMock, MagicMock +from lcfs.web.api.compliance_report.schema import CompliancePeriodSchema, ComplianceReportHistorySchema, ComplianceReportOrganizationSchema, ComplianceReportStatusSchema, ComplianceReportUserSchema, SummarySchema from lcfs.web.api.fuel_export.repo import FuelExportRepository from lcfs.web.api.fuel_code.repo import FuelCodeRepository from lcfs.web.api.fuel_export.services import FuelExportServices @@ -45,6 +47,49 @@ def mock_compliance_report_repo(): repo = AsyncMock(spec=ComplianceReportRepository) return repo +@pytest.fixture +def compliance_period_schema(): + return CompliancePeriodSchema( + compliance_period_id=1, + description="2024", + effective_date=datetime(2024, 1, 1), + expiration_date=datetime(2024, 3, 31), + display_order=1, + ) + +@pytest.fixture +def compliance_report_organization_schema(): + return ComplianceReportOrganizationSchema( + organization_id=1, name="Acme Corporation" + ) + +@pytest.fixture +def summary_schema(): + return SummarySchema(summary_id=1, is_locked=False) + +@pytest.fixture +def compliance_report_status_schema(): + return ComplianceReportStatusSchema(compliance_report_status_id=1, status="Draft") + +@pytest.fixture +def compliance_report_user_schema(compliance_report_organization_schema): + return ComplianceReportUserSchema( + first_name="John", + last_name="Doe", + organization=compliance_report_organization_schema, + ) + +@pytest.fixture +def compliance_report_history_schema( + compliance_report_status_schema, compliance_report_user_schema +): + return ComplianceReportHistorySchema( + compliance_report_history_id=1, + compliance_report_id=1, + status=compliance_report_status_schema, + user_profile=compliance_report_user_schema, + create_date=datetime(2024, 4, 1, 12, 0, 0), + ) @pytest.fixture def mock_fuel_code_repo(): diff --git a/backend/lcfs/tests/fuel_export/test_fuel_exports_views.py b/backend/lcfs/tests/fuel_export/test_fuel_exports_views.py index e749a051e..14dd924df 100644 --- a/backend/lcfs/tests/fuel_export/test_fuel_exports_views.py +++ b/backend/lcfs/tests/fuel_export/test_fuel_exports_views.py @@ -6,6 +6,8 @@ from fastapi import FastAPI from fastapi.encoders import jsonable_encoder +from lcfs.tests.compliance_report.conftest import compliance_report_base_schema +from lcfs.web.api.compliance_report.schema import ChainedComplianceReportSchema from lcfs.web.api.fuel_export.schema import ( FuelExportSchema, FuelExportCreateUpdateSchema, @@ -68,18 +70,24 @@ async def test_get_fuel_exports_invalid_payload( @pytest.mark.anyio async def test_get_fuel_exports_paginated_success( - client: AsyncClient, fastapi_app: FastAPI, set_mock_user + client: AsyncClient, fastapi_app: FastAPI, set_mock_user, compliance_report_base_schema ): with patch( "lcfs.web.api.fuel_export.views.FuelExportServices.get_fuel_exports_paginated" ) as mock_get_fuel_exports_paginated, patch( "lcfs.web.api.fuel_export.views.ComplianceReportValidation.validate_organization_access" - ) as mock_validate_organization_access: + ) as mock_validate_organization_access, patch( + "lcfs.web.api.fuel_export.views.FuelExportServices.get_compliance_report_by_id" + ) as mock_get_compliance_report_by_id: mock_get_fuel_exports_paginated.return_value = FuelExportsSchema( fuel_exports=[] ) mock_validate_organization_access.return_value = True + + mock_compliance_report = compliance_report_base_schema() + + mock_get_compliance_report_by_id.return_value = mock_compliance_report set_mock_user(fastapi_app, [RoleEnum.ANALYST]) url = fastapi_app.url_path_for("get_fuel_exports") @@ -98,16 +106,23 @@ async def test_get_fuel_exports_paginated_success( @pytest.mark.anyio async def test_get_fuel_exports_list_success( - client: AsyncClient, fastapi_app: FastAPI, set_mock_user + client: AsyncClient, fastapi_app: FastAPI, set_mock_user, compliance_report_base_schema ): with patch( "lcfs.web.api.fuel_export.views.FuelExportServices.get_fuel_export_list" ) as mock_get_fuel_export_list, patch( "lcfs.web.api.fuel_export.views.ComplianceReportValidation.validate_organization_access" - ) as mock_validate_organization_access: + ) as mock_validate_organization_access, patch( + "lcfs.web.api.fuel_export.views.FuelExportServices.get_compliance_report_by_id" + ) as mock_get_compliance_report_by_id: mock_get_fuel_export_list.return_value = FuelExportsSchema(fuel_exports=[]) mock_validate_organization_access.return_value = True + + mock_compliance_report = compliance_report_base_schema() + + mock_get_compliance_report_by_id.return_value = mock_compliance_report + set_mock_user(fastapi_app, [RoleEnum.ANALYST]) url = fastapi_app.url_path_for("get_fuel_exports") diff --git a/backend/lcfs/tests/notional_transfer/test_notional_transfer_view.py b/backend/lcfs/tests/notional_transfer/test_notional_transfer_view.py index 0628782b0..8fef93c04 100644 --- a/backend/lcfs/tests/notional_transfer/test_notional_transfer_view.py +++ b/backend/lcfs/tests/notional_transfer/test_notional_transfer_view.py @@ -5,6 +5,7 @@ from lcfs.db.base import UserTypeEnum, ActionTypeEnum from lcfs.db.models.user.Role import RoleEnum +from lcfs.tests.compliance_report.conftest import compliance_report_base_schema from lcfs.web.api.base import ComplianceReportRequestSchema from lcfs.web.api.notional_transfer.schema import ( PaginatedNotionalTransferRequestSchema, @@ -70,12 +71,20 @@ async def test_get_notional_transfers( ): with patch( "lcfs.web.api.notional_transfer.views.ComplianceReportValidation.validate_organization_access" - ) as mock_validate_organization_access: + ) as mock_validate_organization_access,patch( + "lcfs.web.api.notional_transfer.views.ComplianceReportValidation.validate_compliance_report_access" + ) as mock_validate_compliance_report_access, patch( + "lcfs.web.api.notional_transfer.views.NotionalTransferServices.get_compliance_report_by_id" + ) as mock_get_compliance_report_by_id: set_mock_user(fastapi_app, [RoleEnum.SUPPLIER]) url = fastapi_app.url_path_for("get_notional_transfers") payload = ComplianceReportRequestSchema(compliance_report_id=1).model_dump() mock_validate_organization_access.return_value = True + + mock_get_compliance_report_by_id.return_value = compliance_report_base_schema + mock_validate_compliance_report_access.return_value = True + mock_notional_transfer_service.get_notional_transfers.return_value = { "notionalTransfers": [] } diff --git a/backend/lcfs/tests/other_uses/test_other_uses_view.py b/backend/lcfs/tests/other_uses/test_other_uses_view.py index 3f0e92f42..c2ce0dbca 100644 --- a/backend/lcfs/tests/other_uses/test_other_uses_view.py +++ b/backend/lcfs/tests/other_uses/test_other_uses_view.py @@ -5,6 +5,7 @@ from lcfs.db.base import UserTypeEnum, ActionTypeEnum from lcfs.db.models.user.Role import RoleEnum +from lcfs.tests.compliance_report.conftest import compliance_report_base_schema from lcfs.web.api.base import ComplianceReportRequestSchema from lcfs.web.api.other_uses.schema import ( PaginatedOtherUsesRequestSchema, @@ -69,12 +70,21 @@ async def test_get_other_uses( ): with patch( "lcfs.web.api.other_uses.views.ComplianceReportValidation.validate_organization_access" - ) as mock_validate_organization_access: + ) as mock_validate_organization_access, patch( + "lcfs.web.api.notional_transfer.views.ComplianceReportValidation.validate_compliance_report_access" + ) as mock_validate_compliance_report_access, patch( + "lcfs.web.api.notional_transfer.views.NotionalTransferServices.get_compliance_report_by_id" + ) as mock_get_compliance_report_by_id: + set_mock_user(fastapi_app, [RoleEnum.SUPPLIER]) url = fastapi_app.url_path_for("get_other_uses") payload = ComplianceReportRequestSchema(compliance_report_id=1).model_dump() mock_validate_organization_access.return_value = True + + mock_get_compliance_report_by_id.return_value = compliance_report_base_schema + mock_validate_compliance_report_access.return_value = True + mock_other_uses_service.get_other_uses.return_value = {"otherUses": []} fastapi_app.dependency_overrides[OtherUsesServices] = ( diff --git a/backend/lcfs/web/api/allocation_agreement/services.py b/backend/lcfs/web/api/allocation_agreement/services.py index 12954ec63..6516a8379 100644 --- a/backend/lcfs/web/api/allocation_agreement/services.py +++ b/backend/lcfs/web/api/allocation_agreement/services.py @@ -1,10 +1,11 @@ import math import structlog from typing import List -from fastapi import Depends +from fastapi import Depends, HTTPException, status from datetime import datetime from lcfs.web.api.allocation_agreement.repo import AllocationAgreementRepository +from lcfs.web.api.compliance_report.repo import ComplianceReportRepository from lcfs.web.core.decorators import service_handler from lcfs.db.models.compliance.AllocationAgreement import AllocationAgreement from lcfs.web.api.base import PaginationRequestSchema, PaginationResponseSchema @@ -34,9 +35,11 @@ def __init__( self, repo: AllocationAgreementRepository = Depends(AllocationAgreementRepository), fuel_repo: FuelCodeRepository = Depends(), + compliance_report_repo: ComplianceReportRepository = Depends(), ) -> None: self.repo = repo self.fuel_repo = fuel_repo + self.compliance_report_repo = compliance_report_repo async def convert_to_model( self, allocation_agreement: AllocationAgreementCreateSchema @@ -350,3 +353,18 @@ async def create_allocation_agreement( async def delete_allocation_agreement(self, allocation_agreement_id: int) -> str: """Delete an Allocation agreement""" return await self.repo.delete_allocation_agreement(allocation_agreement_id) + + @service_handler + async def get_compliance_report_by_id(self, compliance_report_id: int): + """Get compliance report by period with status""" + compliance_report = await self.compliance_report_repo.get_compliance_report_by_id( + compliance_report_id, + ) + + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found for this period" + ) + + return compliance_report diff --git a/backend/lcfs/web/api/allocation_agreement/views.py b/backend/lcfs/web/api/allocation_agreement/views.py index 3e30919de..67441ded0 100644 --- a/backend/lcfs/web/api/allocation_agreement/views.py +++ b/backend/lcfs/web/api/allocation_agreement/views.py @@ -8,6 +8,7 @@ from fastapi import ( APIRouter, Body, + HTTPException, status, Request, Response, @@ -69,10 +70,31 @@ async def get_allocation_agreements( report_validate: ComplianceReportValidation = Depends(), ): """Endpoint to get list of allocation agreements for a compliance report""" - await report_validate.validate_organization_access( - request_data.compliance_report_id - ) - return await service.get_allocation_agreements(request_data.compliance_report_id) + try: + compliance_report_id = request_data.compliance_report_id + + compliance_report = await service.get_compliance_report_by_id(compliance_report_id) + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found" + ) + + await report_validate.validate_compliance_report_access(compliance_report) + await report_validate.validate_organization_access( + request_data.compliance_report_id + ) + return await service.get_allocation_agreements(request_data.compliance_report_id) + except HTTPException as http_ex: + # Re-raise HTTP exceptions to preserve status code and message + raise http_ex + except Exception as e: + # Log and handle unexpected errors + logger.exception("Error occurred", error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while processing your request" + ) @router.post( diff --git a/backend/lcfs/web/api/compliance_report/validation.py b/backend/lcfs/web/api/compliance_report/validation.py index 91bd315d2..f17129e8a 100644 --- a/backend/lcfs/web/api/compliance_report/validation.py +++ b/backend/lcfs/web/api/compliance_report/validation.py @@ -1,5 +1,6 @@ from fastapi import Depends, HTTPException, Request from lcfs.db.models.user.Role import RoleEnum +from lcfs.db.models.compliance.ComplianceReportStatus import ComplianceReportStatusEnum from lcfs.web.api.compliance_report.repo import ComplianceReportRepository from fastapi import status from lcfs.web.api.role.schema import user_has_roles @@ -41,3 +42,17 @@ async def validate_organization_access(self, compliance_report_id: int): ) return compliance_report + + async def validate_compliance_report_access(self, compliance_report): + """Validates government user access to draft reports""" + is_government = user_has_roles(self.request.user, [RoleEnum.GOVERNMENT]) + + if compliance_report: + status_enum = ComplianceReportStatusEnum(compliance_report.current_status.status) + is_draft = status_enum == ComplianceReportStatusEnum.Draft + + if is_government and is_draft: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Government users cannot access draft compliance reports" + ) diff --git a/backend/lcfs/web/api/compliance_report/views.py b/backend/lcfs/web/api/compliance_report/views.py index de43c0c26..0d89f51e9 100644 --- a/backend/lcfs/web/api/compliance_report/views.py +++ b/backend/lcfs/web/api/compliance_report/views.py @@ -83,7 +83,8 @@ async def get_compliance_report_by_id( service: ComplianceReportServices = Depends(), validate: ComplianceReportValidation = Depends(), ) -> ChainedComplianceReportSchema: - await validate.validate_organization_access(report_id) + compliance_report = await validate.validate_organization_access(report_id) + await validate.validate_compliance_report_access(compliance_report) mask_statuses = not user_has_roles(request.user, [RoleEnum.GOVERNMENT]) diff --git a/backend/lcfs/web/api/dashboard/repo.py b/backend/lcfs/web/api/dashboard/repo.py index aff0f6634..30f838c68 100644 --- a/backend/lcfs/web/api/dashboard/repo.py +++ b/backend/lcfs/web/api/dashboard/repo.py @@ -15,6 +15,7 @@ from lcfs.db.models.compliance.ComplianceReportCountView import ( ComplianceReportCountView, ) +from lcfs.db.models.fuel.FuelCodeCountView import FuelCodeCountView logger = structlog.get_logger(__name__) @@ -89,3 +90,16 @@ async def get_compliance_report_counts(self): return { "pending_reviews": row.pending_reviews } + + @repo_handler + async def get_fuel_code_counts(self): + query = select( + FuelCodeCountView.count + ).where(FuelCodeCountView.status == "Draft") + + result = await self.db.execute(query) + row = result.fetchone() + + return { + "draft_fuel_codes": getattr(row, "count", 0) + } diff --git a/backend/lcfs/web/api/dashboard/schema.py b/backend/lcfs/web/api/dashboard/schema.py index bb0d803d2..65018cd90 100644 --- a/backend/lcfs/web/api/dashboard/schema.py +++ b/backend/lcfs/web/api/dashboard/schema.py @@ -26,3 +26,7 @@ class OrgComplianceReportCountsSchema(BaseSchema): class ComplianceReportCountsSchema(BaseSchema): pending_reviews: int = Field(default=0) + + +class FuelCodeCountsSchema(BaseSchema): + draft_fuel_codes: int = Field(default=0) diff --git a/backend/lcfs/web/api/dashboard/services.py b/backend/lcfs/web/api/dashboard/services.py index 6e433f9b4..ff965460a 100644 --- a/backend/lcfs/web/api/dashboard/services.py +++ b/backend/lcfs/web/api/dashboard/services.py @@ -7,7 +7,8 @@ TransactionCountsSchema, OrganizarionTransactionCountsSchema, OrgComplianceReportCountsSchema, - ComplianceReportCountsSchema + ComplianceReportCountsSchema, + FuelCodeCountsSchema ) logger = structlog.get_logger(__name__) @@ -66,3 +67,13 @@ async def get_compliance_report_counts( return ComplianceReportCountsSchema( pending_reviews=counts.get("pending_reviews", 0) ) + + @service_handler + async def get_fuel_code_counts( + self + ) -> FuelCodeCountsSchema: + counts = await self.repo.get_fuel_code_counts() + + return FuelCodeCountsSchema( + draft_fuel_codes=counts.get("draft_fuel_codes", 0) + ) diff --git a/backend/lcfs/web/api/dashboard/views.py b/backend/lcfs/web/api/dashboard/views.py index cabde7d16..cfcd8003b 100644 --- a/backend/lcfs/web/api/dashboard/views.py +++ b/backend/lcfs/web/api/dashboard/views.py @@ -8,7 +8,8 @@ TransactionCountsSchema, OrganizarionTransactionCountsSchema, OrgComplianceReportCountsSchema, - ComplianceReportCountsSchema + ComplianceReportCountsSchema, + FuelCodeCountsSchema ) from lcfs.db.models.user.Role import RoleEnum @@ -73,3 +74,17 @@ async def get_compliance_report_counts( ): """Endpoint to retrieve count of compliance reports pending review""" return await service.get_compliance_report_counts() + + +@router.get( + "/fuel-code-counts", + response_model=FuelCodeCountsSchema +) +@view_handler([RoleEnum.ANALYST]) +async def get_fuel_code_counts( + request: Request, + service: DashboardServices = Depends(), +): + """Endpoint to retrieve count of compliance reports pending review""" + + return await service.get_fuel_code_counts() diff --git a/backend/lcfs/web/api/final_supply_equipment/repo.py b/backend/lcfs/web/api/final_supply_equipment/repo.py index f2353b624..8c8163baf 100644 --- a/backend/lcfs/web/api/final_supply_equipment/repo.py +++ b/backend/lcfs/web/api/final_supply_equipment/repo.py @@ -113,17 +113,23 @@ async def get_organization_names(self, organization) -> List[str]: Returns: List[str]: A list of unique organization names. """ - organization_names = ( - await self.db.execute( - select(distinct(FinalSupplyEquipment.organization_name)) - .join(ComplianceReport, FinalSupplyEquipment.compliance_report_id == ComplianceReport.compliance_report_id) - .filter(ComplianceReport.organization_id == organization.organization_id) - .filter(FinalSupplyEquipment.organization_name.isnot(None)) - ) - ).all() + try: + if not organization or not organization.organization_id: + return [] + + organization_names = ( + await self.db.execute( + select(distinct(FinalSupplyEquipment.organization_name)) + .join(ComplianceReport, FinalSupplyEquipment.compliance_report_id == ComplianceReport.compliance_report_id) + .filter(ComplianceReport.organization_id == organization.organization_id) + .filter(FinalSupplyEquipment.organization_name.isnot(None)) + ) + ).all() - # Extract strings from the list of tuples - return [name[0] for name in organization_names] + return [name[0] for name in organization_names] + except Exception as e: + logger.error("Error getting organization names", error=str(e)) + return [] @repo_handler async def get_intended_user_by_name(self, intended_user: str) -> EndUseType: diff --git a/backend/lcfs/web/api/final_supply_equipment/services.py b/backend/lcfs/web/api/final_supply_equipment/services.py index a70b1ce4b..a0251fa7c 100644 --- a/backend/lcfs/web/api/final_supply_equipment/services.py +++ b/backend/lcfs/web/api/final_supply_equipment/services.py @@ -1,10 +1,11 @@ import structlog import math import re -from fastapi import Depends, Request +from fastapi import Depends, HTTPException, Request, status from lcfs.db.models.compliance import FinalSupplyEquipment from lcfs.web.api.base import PaginationRequestSchema, PaginationResponseSchema +from lcfs.web.api.compliance_report.repo import ComplianceReportRepository from lcfs.web.api.compliance_report.schema import FinalSupplyEquipmentSchema from lcfs.web.api.final_supply_equipment.schema import ( FinalSupplyEquipmentCreateSchema, @@ -21,41 +22,52 @@ class FinalSupplyEquipmentServices: def __init__( - self, request: Request = None, repo: FinalSupplyEquipmentRepository = Depends() + self, + request: Request = None, + repo: FinalSupplyEquipmentRepository = Depends(), + compliance_report_repo: ComplianceReportRepository = Depends(), ) -> None: self.request = request self.repo = repo + self.compliance_report_repo = compliance_report_repo @service_handler async def get_fse_options(self): """Fetches all FSE options concurrently.""" - organization = self.request.user.organization - ( - intended_use_types, - levels_of_equipment, - fuel_measurement_types, - intended_user_types, - ports, - organization_names, - ) = await self.repo.get_fse_options(organization) - - return { - "intended_use_types": [ - EndUseTypeSchema.model_validate(t) for t in intended_use_types - ], - "levels_of_equipment": [ - LevelOfEquipmentSchema.model_validate(l) for l in levels_of_equipment - ], - "fuel_measurement_types": [ - FuelMeasurementTypeSchema.model_validate(f) - for f in fuel_measurement_types - ], - "intended_user_types": [ - EndUserTypeSchema.model_validate(u) for u in intended_user_types - ], - "ports": [port.value for port in ports], - "organization_names": organization_names, - } + try: + organization = getattr(self.request.user, 'organization', None) + ( + intended_use_types, + levels_of_equipment, + fuel_measurement_types, + intended_user_types, + ports, + organization_names, + ) = await self.repo.get_fse_options(organization) + + return { + "intended_use_types": [ + EndUseTypeSchema.model_validate(t) for t in intended_use_types + ], + "levels_of_equipment": [ + LevelOfEquipmentSchema.model_validate(l) for l in levels_of_equipment + ], + "fuel_measurement_types": [ + FuelMeasurementTypeSchema.model_validate(f) + for f in fuel_measurement_types + ], + "intended_user_types": [ + EndUserTypeSchema.model_validate(u) for u in intended_user_types + ], + "ports": [port.value for port in ports], + "organization_names": organization_names, + } + except Exception as e: + logger.error("Error getting FSE options", error=str(e)) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Error retrieving FSE options" + ) async def convert_to_fse_model(self, fse: FinalSupplyEquipmentCreateSchema): fse_model = FinalSupplyEquipment( @@ -296,3 +308,18 @@ async def generate_registration_number(self, postal_code: str) -> str: async def search_manufacturers(self, query: str) -> list[str]: """Search for manufacturers based on the provided query.""" return await self.repo.search_manufacturers(query) + + @service_handler + async def get_compliance_report_by_id(self, compliance_report_id: int): + """Get compliance report by period with status""" + compliance_report = await self.compliance_report_repo.get_compliance_report_by_id( + compliance_report_id, + ) + + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found for this period" + ) + + return compliance_report \ No newline at end of file diff --git a/backend/lcfs/web/api/final_supply_equipment/views.py b/backend/lcfs/web/api/final_supply_equipment/views.py index b061922df..e1070f439 100644 --- a/backend/lcfs/web/api/final_supply_equipment/views.py +++ b/backend/lcfs/web/api/final_supply_equipment/views.py @@ -4,6 +4,7 @@ from fastapi import ( APIRouter, Body, + HTTPException, Query, status, Request, @@ -13,6 +14,7 @@ from lcfs.db import dependencies from lcfs.web.api.base import PaginationRequestSchema +from lcfs.web.api.compliance_report.validation import ComplianceReportValidation from lcfs.web.api.compliance_report.schema import ( CommonPaginatedReportRequestSchema, FinalSupplyEquipmentSchema, @@ -59,22 +61,46 @@ async def get_final_supply_equipments( service: FinalSupplyEquipmentServices = Depends(), report_validate: ComplianceReportValidation = Depends(), ) -> FinalSupplyEquipmentsSchema: - """Endpoint to get list of final supply equipments for a compliance report""" - compliance_report_id = request_data.compliance_report_id - await report_validate.validate_organization_access(compliance_report_id) - if hasattr(request_data, "page") and request_data.page is not None: - # handle pagination. - pagination = PaginationRequestSchema( - page=request_data.page, - size=request_data.size, - sort_orders=request_data.sort_orders, - filters=request_data.filters, - ) - return await service.get_final_supply_equipments_paginated( - pagination, compliance_report_id + """ + Endpoint to get list of final supply equipments for a compliance report + """ + try: + compliance_report_id = request_data.compliance_report_id + + compliance_report = await service.get_compliance_report_by_id(compliance_report_id) + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found" + ) + + await report_validate.validate_compliance_report_access(compliance_report) + await report_validate.validate_organization_access(compliance_report_id) + + if hasattr(request_data, "page") and request_data.page is not None: + # Handle pagination + pagination = PaginationRequestSchema( + page=request_data.page, + size=request_data.size, + sort_orders=request_data.sort_orders, + filters=request_data.filters, + ) + return await service.get_final_supply_equipments_paginated( + pagination, compliance_report_id + ) + else: + return await service.get_fse_list(compliance_report_id) + except HTTPException as http_ex: + # Re-raise HTTP exceptions to preserve status code and message + raise http_ex + except Exception as e: + # Log and handle unexpected errors + logger.exception("Error occurred", error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while processing your request" ) - else: - return await service.get_fse_list(compliance_report_id) + @router.post( diff --git a/backend/lcfs/web/api/fuel_code/repo.py b/backend/lcfs/web/api/fuel_code/repo.py index d05318569..9b4af7b5b 100644 --- a/backend/lcfs/web/api/fuel_code/repo.py +++ b/backend/lcfs/web/api/fuel_code/repo.py @@ -202,7 +202,8 @@ async def get_transport_mode(self, transport_mode_id: int) -> TransportMode: @repo_handler async def get_transport_mode_by_name(self, mode_name: str) -> TransportMode: - query = select(TransportMode).where(TransportMode.transport_mode == mode_name) + query = select(TransportMode).where( + TransportMode.transport_mode == mode_name) result = await self.db.execute(query) transport_mode = result.scalar_one() @@ -247,7 +248,8 @@ async def get_energy_densities(self) -> List[EnergyDensity]: async def get_energy_density(self, fuel_type_id) -> EnergyDensity: """Get the energy density for the specified fuel_type_id""" - stmt = select(EnergyDensity).where(EnergyDensity.fuel_type_id == fuel_type_id) + stmt = select(EnergyDensity).where( + EnergyDensity.fuel_type_id == fuel_type_id) result = await self.db.execute(stmt) energy_density = result.scalars().first() @@ -300,7 +302,8 @@ async def get_fuel_codes_paginated( List[FuelCodeSchema]: A list of fuel codes matching the query. """ delete_status = await self.get_fuel_status_by_status("Deleted") - conditions = [FuelCode.fuel_status_id != delete_status.fuel_code_status_id] + conditions = [FuelCode.fuel_status_id != + delete_status.fuel_code_status_id] for filter in pagination.filters: @@ -341,20 +344,27 @@ async def get_fuel_codes_paginated( field = get_field_for_filter(FuelCode, filter.field) conditions.append( - apply_filter_conditions(field, filter_value, filter_option, filter_type) + apply_filter_conditions( + field, filter_value, filter_option, filter_type) ) # setup pagination - offset = 0 if (pagination.page < 1) else (pagination.page - 1) * pagination.size + offset = 0 if (pagination.page < 1) else ( + pagination.page - 1) * pagination.size limit = pagination.size # Construct the select query with options for eager loading query = ( select(FuelCode) + .join(FuelCode.fuel_code_status) # Add explicit join for status + .join(FuelCode.fuel_code_prefix) # Add explicit join for prefix + .join(FuelCode.fuel_type) # Add explicit join for fuel type .options( - joinedload(FuelCode.fuel_code_status), - joinedload(FuelCode.fuel_code_prefix), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_1), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_2), + contains_eager(FuelCode.fuel_code_status), + contains_eager(FuelCode.fuel_code_prefix), + contains_eager(FuelCode.fuel_type).joinedload( + FuelType.provision_1), + contains_eager(FuelCode.fuel_type).joinedload( + FuelType.provision_2), joinedload(FuelCode.feedstock_fuel_transport_modes).joinedload( FeedstockFuelTransportMode.feedstock_fuel_transport_mode ), @@ -382,7 +392,8 @@ async def get_fuel_codes_paginated( # Execute the main query to retrieve all fuel codes result = await self.db.execute( - query.offset(offset).limit(limit).order_by(FuelCode.create_date.desc()) + query.offset(offset).limit(limit).order_by( + FuelCode.create_date.desc()) ) fuel_codes = result.unique().scalars().all() return fuel_codes, total_count @@ -417,8 +428,10 @@ async def get_fuel_code(self, fuel_code_id: int) -> FuelCode: joinedload(FuelCode.finished_fuel_transport_modes).joinedload( FinishedFuelTransportMode.finished_fuel_transport_mode ), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_1), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_2), + joinedload(FuelCode.fuel_type).joinedload( + FuelType.provision_1), + joinedload(FuelCode.fuel_type).joinedload( + FuelType.provision_2), ) .where(FuelCode.fuel_code_id == fuel_code_id) ) @@ -428,7 +441,8 @@ async def get_fuel_code_status( self, fuel_code_status: FuelCodeStatusEnum ) -> FuelCodeStatus: return await self.db.scalar( - select(FuelCodeStatus).where(FuelCodeStatus.status == fuel_code_status) + select(FuelCodeStatus).where( + FuelCodeStatus.status == fuel_code_status) ) @repo_handler @@ -486,7 +500,8 @@ async def get_contact_email_by_company_and_name( .where( and_( func.lower(FuelCode.company) == func.lower(company), - func.lower(FuelCode.contact_name) == func.lower(contact_name), + func.lower(FuelCode.contact_name) == func.lower( + contact_name), ), func.lower(FuelCode.contact_email).like( func.lower(contact_email + "%") @@ -528,8 +543,10 @@ async def get_fuel_code_by_code_prefix( .options( joinedload(FuelCode.fuel_code_status), joinedload(FuelCode.fuel_code_prefix), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_1), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_2), + joinedload(FuelCode.fuel_type).joinedload( + FuelType.provision_1), + joinedload(FuelCode.fuel_type).joinedload( + FuelType.provision_2), joinedload(FuelCode.feedstock_fuel_transport_modes).joinedload( FeedstockFuelTransportMode.feedstock_fuel_transport_mode ), @@ -670,7 +687,8 @@ async def get_next_available_sub_version_fuel_code_by_prefix( ) result = ( await self.db.execute( - query, {"input_version": int(input_version), "prefix_id": prefix_id} + query, {"input_version": int( + input_version), "prefix_id": prefix_id} ) ).scalar_one_or_none() return self.format_decimal(result) @@ -692,8 +710,10 @@ async def get_latest_fuel_codes(self) -> List[FuelCodeSchema]: joinedload(FuelCode.finished_fuel_transport_modes).joinedload( FinishedFuelTransportMode.finished_fuel_transport_mode ), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_1), - joinedload(FuelCode.fuel_type).joinedload(FuelType.provision_2), + joinedload(FuelCode.fuel_type).joinedload( + FuelType.provision_1), + joinedload(FuelCode.fuel_type).joinedload( + FuelType.provision_2), ) .filter(FuelCodeStatus.status != FuelCodeStatusEnum.Deleted) ) diff --git a/backend/lcfs/web/api/fuel_export/services.py b/backend/lcfs/web/api/fuel_export/services.py index 3fc118a71..4315cfa3a 100644 --- a/backend/lcfs/web/api/fuel_export/services.py +++ b/backend/lcfs/web/api/fuel_export/services.py @@ -1,7 +1,7 @@ import math import structlog -from fastapi import Depends, Request +from fastapi import Depends, HTTPException, Request, status from lcfs.utils.constants import default_ci from lcfs.web.api.base import ( @@ -259,3 +259,18 @@ async def get_fuel_exports_paginated( ), fuel_exports=[FuelExportSchema.model_validate(fs) for fs in fuel_exports], ) + + @service_handler + async def get_compliance_report_by_id(self, compliance_report_id: int): + """Get compliance report by period with status""" + compliance_report = await self.compliance_report_repo.get_compliance_report_by_id( + compliance_report_id, + ) + + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found for this period" + ) + + return compliance_report \ No newline at end of file diff --git a/backend/lcfs/web/api/fuel_export/views.py b/backend/lcfs/web/api/fuel_export/views.py index 292f61326..d44321419 100644 --- a/backend/lcfs/web/api/fuel_export/views.py +++ b/backend/lcfs/web/api/fuel_export/views.py @@ -58,21 +58,42 @@ async def get_fuel_exports( report_validate: ComplianceReportValidation = Depends(), ) -> FuelExportsSchema: """Endpoint to get list of fuel supplied list for a compliance report""" - compliance_report_id = request_data.compliance_report_id - await report_validate.validate_organization_access(compliance_report_id) - if hasattr(request_data, "page") and request_data.page is not None: - # handle pagination. - pagination = PaginationRequestSchema( - page=request_data.page, - size=request_data.size, - sort_orders=request_data.sort_orders, - filters=request_data.filters, - ) - return await service.get_fuel_exports_paginated( - pagination, compliance_report_id + try: + compliance_report_id = request_data.compliance_report_id + + compliance_report = await service.get_compliance_report_by_id(compliance_report_id) + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found" + ) + + await report_validate.validate_compliance_report_access(compliance_report) + await report_validate.validate_organization_access(compliance_report_id) + if hasattr(request_data, "page") and request_data.page is not None: + # handle pagination. + pagination = PaginationRequestSchema( + page=request_data.page, + size=request_data.size, + sort_orders=request_data.sort_orders, + filters=request_data.filters, + ) + return await service.get_fuel_exports_paginated( + pagination, compliance_report_id + ) + else: + return await service.get_fuel_export_list(compliance_report_id) + except HTTPException as http_ex: + # Re-raise HTTP exceptions to preserve status code and message + raise http_ex + except Exception as e: + # Log and handle unexpected errors + logger.exception("Error occurred", error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while processing your request" ) - else: - return await service.get_fuel_export_list(compliance_report_id) + @router.post( diff --git a/backend/lcfs/web/api/fuel_supply/services.py b/backend/lcfs/web/api/fuel_supply/services.py index 3eb146e1e..f9d6460ee 100644 --- a/backend/lcfs/web/api/fuel_supply/services.py +++ b/backend/lcfs/web/api/fuel_supply/services.py @@ -1,8 +1,9 @@ import structlog import math -from fastapi import Depends, Request, HTTPException +from fastapi import Depends, Request, HTTPException, status from lcfs.web.api.base import PaginationRequestSchema, PaginationResponseSchema +from lcfs.web.api.compliance_report.repo import ComplianceReportRepository from lcfs.web.api.fuel_code.repo import FuelCodeRepository from lcfs.web.api.fuel_supply.schema import ( EndUseTypeSchema, @@ -33,10 +34,12 @@ def __init__( request: Request = None, repo: FuelSupplyRepository = Depends(), fuel_repo: FuelCodeRepository = Depends(), + compliance_report_repo: ComplianceReportRepository = Depends(), ) -> None: self.request = request self.repo = repo self.fuel_repo = fuel_repo + self.compliance_report_repo = compliance_report_repo def fuel_type_row_mapper(self, compliance_period, fuel_types, row): column_names = row._fields @@ -268,3 +271,18 @@ async def get_fuel_supplies_paginated( FuelSupplyResponseSchema.model_validate(fs) for fs in fuel_supplies ], ) + + @service_handler + async def get_compliance_report_by_id(self, compliance_report_id: int): + """Get compliance report by period with status""" + compliance_report = await self.compliance_report_repo.get_compliance_report_by_id( + compliance_report_id, + ) + + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found for this period" + ) + + return compliance_report \ No newline at end of file diff --git a/backend/lcfs/web/api/fuel_supply/views.py b/backend/lcfs/web/api/fuel_supply/views.py index cb95c2597..ead977380 100644 --- a/backend/lcfs/web/api/fuel_supply/views.py +++ b/backend/lcfs/web/api/fuel_supply/views.py @@ -48,21 +48,40 @@ async def get_fuel_supply( report_validate: ComplianceReportValidation = Depends(), ) -> FuelSuppliesSchema: """Endpoint to get list of fuel supplied list for a compliance report""" - compliance_report_id = request_data.compliance_report_id - await report_validate.validate_organization_access(compliance_report_id) - if hasattr(request_data, "page") and request_data.page is not None: - # Handle pagination. - pagination = PaginationRequestSchema( - page=request_data.page, - size=request_data.size, - sort_orders=request_data.sort_orders, - filters=request_data.filters, - ) - return await service.get_fuel_supplies_paginated( - pagination, compliance_report_id + try: + compliance_report_id = request_data.compliance_report_id + compliance_report = await service.get_compliance_report_by_id(compliance_report_id) + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found" + ) + + await report_validate.validate_compliance_report_access(compliance_report) + await report_validate.validate_organization_access(compliance_report_id) + if hasattr(request_data, "page") and request_data.page is not None: + # Handle pagination. + pagination = PaginationRequestSchema( + page=request_data.page, + size=request_data.size, + sort_orders=request_data.sort_orders, + filters=request_data.filters, + ) + return await service.get_fuel_supplies_paginated( + pagination, compliance_report_id + ) + else: + return await service.get_fuel_supply_list(compliance_report_id) + except HTTPException as http_ex: + # Re-raise HTTP exceptions to preserve status code and message + raise http_ex + except Exception as e: + # Log and handle unexpected errors + logger.exception("Error occurred", error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while processing your request" ) - else: - return await service.get_fuel_supply_list(compliance_report_id) @router.post( diff --git a/backend/lcfs/web/api/notional_transfer/services.py b/backend/lcfs/web/api/notional_transfer/services.py index 5b66c5cfc..86f3ce212 100644 --- a/backend/lcfs/web/api/notional_transfer/services.py +++ b/backend/lcfs/web/api/notional_transfer/services.py @@ -3,10 +3,11 @@ from typing import Optional import structlog -from fastapi import Depends +from fastapi import Depends, HTTPException, status from lcfs.db.base import UserTypeEnum, ActionTypeEnum from lcfs.db.models.compliance.NotionalTransfer import NotionalTransfer +from lcfs.web.api.compliance_report.repo import ComplianceReportRepository from lcfs.web.api.base import PaginationRequestSchema, PaginationResponseSchema from lcfs.web.api.fuel_code.repo import FuelCodeRepository from lcfs.web.api.notional_transfer.repo import NotionalTransferRepository @@ -40,9 +41,11 @@ def __init__( self, repo: NotionalTransferRepository = Depends(NotionalTransferRepository), fuel_repo: FuelCodeRepository = Depends(), + compliance_report_repo: ComplianceReportRepository = Depends(), ) -> None: self.repo = repo self.fuel_repo = fuel_repo + self.compliance_report_repo = compliance_report_repo async def convert_to_model( self, notional_transfer_data: NotionalTransferCreateSchema @@ -235,3 +238,18 @@ async def delete_notional_transfer( await self.repo.create_notional_transfer(deleted_entity) return DeleteNotionalTransferResponseSchema(message="Marked as deleted.") + + @service_handler + async def get_compliance_report_by_id(self, compliance_report_id: int): + """Get compliance report by period with status""" + compliance_report = await self.compliance_report_repo.get_compliance_report_by_id( + compliance_report_id, + ) + + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found for this period" + ) + + return compliance_report \ No newline at end of file diff --git a/backend/lcfs/web/api/notional_transfer/views.py b/backend/lcfs/web/api/notional_transfer/views.py index 056cc07d5..97a108a3e 100644 --- a/backend/lcfs/web/api/notional_transfer/views.py +++ b/backend/lcfs/web/api/notional_transfer/views.py @@ -67,11 +67,32 @@ async def get_notional_transfers( report_validate: ComplianceReportValidation = Depends(), ): """Endpoint to get list of notional transfers for a compliance report""" - await report_validate.validate_organization_access( + try: request_data.compliance_report_id - ) - return await service.get_notional_transfers(request_data.compliance_report_id) + compliance_report = await service.get_compliance_report_by_id(request_data.compliance_report_id) + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found" + ) + + await report_validate.validate_compliance_report_access(compliance_report) + await report_validate.validate_organization_access( + request_data.compliance_report_id + ) + return await service.get_notional_transfers(request_data.compliance_report_id) + + except HTTPException as http_ex: + # Re-raise HTTP exceptions to preserve status code and message + raise http_ex + except Exception as e: + # Log and handle unexpected errors + logger.exception("Error occurred", error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while processing your request" + ) @router.post( "/list", diff --git a/backend/lcfs/web/api/other_uses/services.py b/backend/lcfs/web/api/other_uses/services.py index ac4bd29f7..61e2c52d7 100644 --- a/backend/lcfs/web/api/other_uses/services.py +++ b/backend/lcfs/web/api/other_uses/services.py @@ -3,9 +3,10 @@ from typing import Optional import structlog -from fastapi import Depends +from fastapi import Depends, HTTPException, status from lcfs.db.base import UserTypeEnum, ActionTypeEnum +from lcfs.web.api.compliance_report.repo import ComplianceReportRepository from lcfs.web.api.other_uses.repo import OtherUsesRepository from lcfs.web.core.decorators import service_handler from lcfs.db.models.compliance.OtherUses import OtherUses @@ -44,9 +45,11 @@ def __init__( self, repo: OtherUsesRepository = Depends(OtherUsesRepository), fuel_repo: FuelCodeRepository = Depends(), + compliance_report_repo: ComplianceReportRepository = Depends(), ) -> None: self.repo = repo self.fuel_repo = fuel_repo + self.compliance_report_repo = compliance_report_repo async def schema_to_model(self, other_use: OtherUsesCreateSchema) -> OtherUses: """ @@ -294,3 +297,18 @@ async def delete_other_use( await self.repo.create_other_use(deleted_entity) return DeleteOtherUsesResponseSchema(success=True, message="Marked as deleted.") + + @service_handler + async def get_compliance_report_by_id(self, compliance_report_id: int): + """Get compliance report by period with status""" + compliance_report = await self.compliance_report_repo.get_compliance_report_by_id( + compliance_report_id, + ) + + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found for this period" + ) + + return compliance_report \ No newline at end of file diff --git a/backend/lcfs/web/api/other_uses/views.py b/backend/lcfs/web/api/other_uses/views.py index 915f691e6..47f4c3018 100644 --- a/backend/lcfs/web/api/other_uses/views.py +++ b/backend/lcfs/web/api/other_uses/views.py @@ -61,10 +61,31 @@ async def get_other_uses( report_validate: ComplianceReportValidation = Depends(), ): """Endpoint to get list of other uses for a compliance report""" - await report_validate.validate_organization_access( - request_data.compliance_report_id - ) - return await service.get_other_uses(request_data.compliance_report_id) + try: + compliance_report_id = request_data.compliance_report_id + + compliance_report = await service.get_compliance_report_by_id(compliance_report_id) + if not compliance_report: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Compliance report not found" + ) + + await report_validate.validate_compliance_report_access(compliance_report) + await report_validate.validate_organization_access( + request_data.compliance_report_id + ) + return await service.get_other_uses(request_data.compliance_report_id) + except HTTPException as http_ex: + # Re-raise HTTP exceptions to preserve status code and message + raise http_ex + except Exception as e: + # Log and handle unexpected errors + logger.exception("Error occurred", error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while processing your request" + ) @router.post( diff --git a/backend/lcfs/web/api/transaction/repo.py b/backend/lcfs/web/api/transaction/repo.py index 861b1e32b..7af5ed2f3 100644 --- a/backend/lcfs/web/api/transaction/repo.py +++ b/backend/lcfs/web/api/transaction/repo.py @@ -83,8 +83,9 @@ async def get_transactions_paginated( non_transfer_condition = and_( TransactionView.transaction_type != "Transfer", TransactionView.to_organization_id == organization_id, - TransactionView.status - == "Approved", # This status includes InitiativeAgreement and AdminAdjustment + TransactionView.status.in_( + ["Approved", "Assessed"] + ), ) # Combine conditions since an organization can be both transferor and transferee, or neither for non-"Transfer" transactions diff --git a/etl/database/nifi-registry-primary.mv.db b/etl/database/nifi-registry-primary.mv.db index f5b1bf947..559b3fbf2 100644 Binary files a/etl/database/nifi-registry-primary.mv.db and b/etl/database/nifi-registry-primary.mv.db differ diff --git a/etl/nifi/conf/flow.json.gz b/etl/nifi/conf/flow.json.gz index e2e4eaaef..84d2fdaf7 100644 Binary files a/etl/nifi/conf/flow.json.gz and b/etl/nifi/conf/flow.json.gz differ diff --git a/etl/nifi/conf/flow.xml.gz b/etl/nifi/conf/flow.xml.gz index 203856fcb..2d5e2642b 100644 Binary files a/etl/nifi/conf/flow.xml.gz and b/etl/nifi/conf/flow.xml.gz differ diff --git a/etl/nifi_scripts/clean_ups.groovy b/etl/nifi_scripts/clean_ups.groovy index 025c46ecb..8a893064d 100644 --- a/etl/nifi_scripts/clean_ups.groovy +++ b/etl/nifi_scripts/clean_ups.groovy @@ -1,7 +1,6 @@ import java.sql.Connection import java.sql.PreparedStatement -import java.sql.ResultSet -import groovy.json.JsonSlurper +import java.sql.SQLException log.warn('**** STARTING TRANSFER UPDATE SQL ****') @@ -14,39 +13,73 @@ def updateTransferEffectiveDateSQL = """ AND update_date::date = transaction_effective_date::date; -- On the same day as transaction_effective_date """ -// Fetch connections to both source and destination databases -// Replace the UUIDs with your actual Controller Service identifiers -// For this UPDATE, only the destination database connection is required -def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') +// Cleanup queries +def cleanUpQueries = [ + """ + -- 105 + UPDATE "transaction" + SET compliance_units = -6994 + WHERE transaction_id = 1491; + """, + """ + -- 273 + UPDATE compliance_report + SET transaction_id = null + WHERE compliance_report_id = 764; + """, + """ + DELETE FROM "transaction" + WHERE transaction_id = 1920; + """ +] -// Initialize database connections +// Fetch connection to the destination database +def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') Connection destinationConn = null try { - // Get a connection from the Destination DBCP Connection Pool + // Obtain a connection from the Destination DBCP Connection Pool destinationConn = destinationDbcpService.getConnection() + destinationConn.setAutoCommit(false) // Begin transaction - // Step 1: Execute the UPDATE statement - PreparedStatement updateStmt = destinationConn.prepareStatement(updateTransferEffectiveDateSQL) - - // Execute the UPDATE statement - int rowsUpdated = updateStmt.executeUpdate() + // Step 1: Execute the UPDATE on public.transfer + try (PreparedStatement updateStmt = destinationConn.prepareStatement(updateTransferEffectiveDateSQL)) { + int rowsUpdated = updateStmt.executeUpdate() + log.info("Successfully executed UPDATE on 'public.transfer'. Rows affected: ${rowsUpdated}") + } - log.info("Successfully executed UPDATE on 'public.transfer'. Rows affected: ${rowsUpdated}") + // Step 2: Execute the cleanup queries in sequence + cleanUpQueries.each { query -> + try (PreparedStatement stmt = destinationConn.prepareStatement(query)) { + stmt.executeUpdate() + } + } + log.info("Cleanup queries executed successfully.") - // Close the UPDATE statement - updateStmt.close() + // Commit transaction + destinationConn.commit() + log.info("Transaction committed successfully.") } catch (Exception e) { - log.error('Error occurred while executing TRANSFER UPDATE SQL', e) - throw new ProcessException(e) + // Rollback transaction on error + if (destinationConn != null) { + try { + destinationConn.rollback() + log.warn("Transaction rolled back due to error.") + } catch (SQLException rollbackEx) { + log.error("Error occurred during transaction rollback", rollbackEx) + } + } + log.error('Error occurred during SQL operations', e) + throw new RuntimeException(e) } finally { // Ensure the connection is closed if (destinationConn != null) { try { destinationConn.close() - } catch (SQLException ignore) { - // Ignored + log.info("Database connection closed.") + } catch (SQLException closeEx) { + log.warn("Error occurred while closing the database connection", closeEx) } } } diff --git a/etl/nifi_scripts/compliance_summary.groovy b/etl/nifi_scripts/compliance_summary.groovy new file mode 100644 index 000000000..5bf8c02f5 --- /dev/null +++ b/etl/nifi_scripts/compliance_summary.groovy @@ -0,0 +1,499 @@ +import groovy.json.JsonSlurper +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet +import java.sql.Timestamp +import java.util.Calendar + + +// ========================================= +// NiFi Controller Services +// ========================================= +def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') +def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') + +Connection sourceConn = null +Connection destinationConn = null + +try { + sourceConn = sourceDbcpService.getConnection() + destinationConn = destinationDbcpService.getConnection() + destinationConn.setAutoCommit(false) + + // ========================================= + // Fetch Data from Source Table + // ========================================= + + def SOURCE_QUERY = """ + SELECT + cr.summary_id, + cr.id AS compliance_report_id, + crs.gasoline_class_retained, + crs.gasoline_class_deferred, + crs.diesel_class_retained, + crs.diesel_class_deferred, + crs.credits_offset, + crs.diesel_class_obligation, + crs.diesel_class_previously_retained, + crs.gasoline_class_obligation, + crs.gasoline_class_previously_retained, + crs.credits_offset_a, + crs.credits_offset_b, + crs.credits_offset_c + FROM + public.compliance_report cr + JOIN + public.compliance_report_summary crs + ON cr.summary_id = crs.id WHERE cr.summary_id IS NOT NULL + ORDER BY + cr.id; + """ + + PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY) + ResultSet rs = sourceStmt.executeQuery() + + // ========================================= + // Prepare Destination Insert Statement + // ========================================= + + def INSERT_DESTINATION_SUMMARY_SQL = """ + INSERT INTO public.compliance_report_summary ( + summary_id, + compliance_report_id, + quarter, + is_locked, + line_1_fossil_derived_base_fuel_gasoline, + line_1_fossil_derived_base_fuel_diesel, + line_1_fossil_derived_base_fuel_jet_fuel, + line_2_eligible_renewable_fuel_supplied_gasoline, + line_2_eligible_renewable_fuel_supplied_diesel, + line_2_eligible_renewable_fuel_supplied_jet_fuel, + line_3_total_tracked_fuel_supplied_gasoline, + line_3_total_tracked_fuel_supplied_diesel, + line_3_total_tracked_fuel_supplied_jet_fuel, + line_4_eligible_renewable_fuel_required_gasoline, + line_4_eligible_renewable_fuel_required_diesel, + line_4_eligible_renewable_fuel_required_jet_fuel, + line_5_net_notionally_transferred_gasoline, + line_5_net_notionally_transferred_diesel, + line_5_net_notionally_transferred_jet_fuel, + line_6_renewable_fuel_retained_gasoline, + line_6_renewable_fuel_retained_diesel, + line_6_renewable_fuel_retained_jet_fuel, + line_7_previously_retained_gasoline, + line_7_previously_retained_diesel, + line_7_previously_retained_jet_fuel, + line_8_obligation_deferred_gasoline, + line_8_obligation_deferred_diesel, + line_8_obligation_deferred_jet_fuel, + line_9_obligation_added_gasoline, + line_9_obligation_added_diesel, + line_9_obligation_added_jet_fuel, + line_10_net_renewable_fuel_supplied_gasoline, + line_10_net_renewable_fuel_supplied_diesel, + line_10_net_renewable_fuel_supplied_jet_fuel, + line_11_non_compliance_penalty_gasoline, + line_11_non_compliance_penalty_diesel, + line_11_non_compliance_penalty_jet_fuel, + line_12_low_carbon_fuel_required, + line_13_low_carbon_fuel_supplied, + line_14_low_carbon_fuel_surplus, + line_15_banked_units_used, + line_16_banked_units_remaining, + line_17_non_banked_units_used, + line_18_units_to_be_banked, + line_19_units_to_be_exported, + line_20_surplus_deficit_units, + line_21_surplus_deficit_ratio, + line_22_compliance_units_issued, + line_11_fossil_derived_base_fuel_gasoline, + line_11_fossil_derived_base_fuel_diesel, + line_11_fossil_derived_base_fuel_jet_fuel, + line_11_fossil_derived_base_fuel_total, + line_21_non_compliance_penalty_payable, + total_non_compliance_penalty_payable, + credits_offset_a, + credits_offset_b, + credits_offset_c, + create_date, + update_date, + create_user, + update_user + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + """ + + + PreparedStatement destinationStmt = destinationConn.prepareStatement(INSERT_DESTINATION_SUMMARY_SQL) + + // ========================================= + // Initialize Counters for Logging + // ========================================= + + int totalInserted = 0 + int totalSkipped = 0 + + // ========================================= + // Processing Loop + // ========================================= + + while (rs.next()) { + // Fetch source fields + def summaryId = rs.getInt("summary_id") + def complianceReportId = rs.getInt("compliance_report_id") + def gasolineClassRetained = rs.getBigDecimal('gasoline_class_retained') + def gasolineClassDeferred = rs.getBigDecimal('gasoline_class_deferred') + def dieselClassRetained = rs.getBigDecimal('diesel_class_retained') + def dieselClassDeferred = rs.getBigDecimal('diesel_class_deferred') + def creditsOffset = rs.getInt('credits_offset') + def dieselClassObligation = rs.getBigDecimal('diesel_class_obligation') + def dieselClassPreviouslyRetained = rs.getBigDecimal('diesel_class_previously_retained') + def gasolineClassObligation = rs.getBigDecimal('gasoline_class_obligation') + def gasolineClassPreviouslyRetained = rs.getBigDecimal('gasoline_class_previously_retained') + def creditsOffsetA = rs.getInt('credits_offset_a') + def creditsOffsetB = rs.getInt('credits_offset_b') + def creditsOffsetC = rs.getInt('credits_offset_c') + + // Create a summaryRecord map for the destination table + def summaryRecord = [ + summary_id : summaryId, + compliance_report_id : complianceReportId, + quarter : null, + is_locked : true, + line_1_fossil_derived_base_fuel_gasoline : null, // No direct mapping + line_1_fossil_derived_base_fuel_diesel : null, // No direct mapping + line_1_fossil_derived_base_fuel_jet_fuel : null, // No direct mapping + line_2_eligible_renewable_fuel_supplied_gasoline : null, // No direct mapping + line_2_eligible_renewable_fuel_supplied_diesel : null, // No direct mapping + line_2_eligible_renewable_fuel_supplied_jet_fuel : null, // No direct mapping + line_3_total_tracked_fuel_supplied_gasoline : null, // No direct mapping + line_3_total_tracked_fuel_supplied_diesel : null, // No direct mapping + line_3_total_tracked_fuel_supplied_jet_fuel : null, // No direct mapping + line_4_eligible_renewable_fuel_required_gasoline : null, // No direct mapping + line_4_eligible_renewable_fuel_required_diesel : null, // No direct mapping + line_4_eligible_renewable_fuel_required_jet_fuel : null, // No direct mapping + line_5_net_notionally_transferred_gasoline : null, // No direct mapping + line_5_net_notionally_transferred_diesel : null, // No direct mapping + line_5_net_notionally_transferred_jet_fuel : null, // No direct mapping + line_6_renewable_fuel_retained_gasoline : gasolineClassRetained, + line_6_renewable_fuel_retained_diesel : dieselClassRetained, + line_6_renewable_fuel_retained_jet_fuel : null, // No direct mapping + line_7_previously_retained_gasoline : gasolineClassPreviouslyRetained, + line_7_previously_retained_diesel : dieselClassPreviouslyRetained, + line_7_previously_retained_jet_fuel : null, // No direct mapping + line_8_obligation_deferred_gasoline : gasolineClassDeferred, + line_8_obligation_deferred_diesel : dieselClassDeferred, + line_8_obligation_deferred_jet_fuel : null, // No direct mapping + line_9_obligation_added_gasoline : gasolineClassObligation, + line_9_obligation_added_diesel : dieselClassObligation, + line_9_obligation_added_jet_fuel : null, // No direct mapping + line_10_net_renewable_fuel_supplied_gasoline : null, // No direct mapping + line_10_net_renewable_fuel_supplied_diesel : null, // No direct mapping + line_10_net_renewable_fuel_supplied_jet_fuel : null, // No direct mapping + line_11_non_compliance_penalty_gasoline : null, // No direct mapping + line_11_non_compliance_penalty_diesel : null, // No direct mapping + line_11_non_compliance_penalty_jet_fuel : null, // No direct mapping + line_12_low_carbon_fuel_required : null, // No direct mapping + line_13_low_carbon_fuel_supplied : null, // No direct mapping + line_14_low_carbon_fuel_surplus : null, // No direct mapping + line_15_banked_units_used : null, // No direct mapping + line_16_banked_units_remaining : null, // No direct mapping + line_17_non_banked_units_used : null, // No direct mapping + line_18_units_to_be_banked : null, // No direct mapping + line_19_units_to_be_exported : null, // No direct mapping + line_20_surplus_deficit_units : null, // No direct mapping + line_21_surplus_deficit_ratio : null, // No direct mapping + line_22_compliance_units_issued : creditsOffset, // No direct mapping + line_11_fossil_derived_base_fuel_gasoline : null, // No direct mapping + line_11_fossil_derived_base_fuel_diesel : null, // No direct mapping + line_11_fossil_derived_base_fuel_jet_fuel : null, // No direct mapping + line_11_fossil_derived_base_fuel_total : null, // No direct mapping + line_21_non_compliance_penalty_payable : null, // No direct mapping + total_non_compliance_penalty_payable : null, // No direct mapping + credits_offset_a : creditsOffsetA, // Direct mapping + credits_offset_b : creditsOffsetB, // Direct mapping + credits_offset_c : creditsOffsetC, // Direct mapping + create_date : new Timestamp(System.currentTimeMillis()), + update_date : new Timestamp(System.currentTimeMillis()), + create_user : "etl_user", // Replace with actual user or mapping + update_user : "etl_user" // Replace with actual user or mapping + ] + + + // ========================================= + // Insertion into Destination Table + // ========================================= + + try { + // 1. summary_id (int4) + destinationStmt.setInt(1, summaryRecord.summary_id) + + // 2. compliance_report_id (int4) + destinationStmt.setInt(2, summaryRecord.compliance_report_id) + + // 3. quarter (int4) + if (summaryRecord.quarter != null) { + destinationStmt.setInt(3, summaryRecord.quarter) + } else { + destinationStmt.setNull(3, java.sql.Types.INTEGER) + } + + // 4. is_locked (bool) + destinationStmt.setBoolean(4, summaryRecord.is_locked) + + // 5. line_1_fossil_derived_base_fuel_gasoline (float8) NOT NULL + if (summaryRecord.line_1_fossil_derived_base_fuel_gasoline != null) { + destinationStmt.setDouble(5, summaryRecord.line_1_fossil_derived_base_fuel_gasoline.doubleValue()) + } else { + destinationStmt.setDouble(5, 0.0) // Default value or handle as per business logic + } + + // 6. line_1_fossil_derived_base_fuel_diesel (float8) NOT NULL + if (summaryRecord.line_1_fossil_derived_base_fuel_diesel != null) { + destinationStmt.setDouble(6, summaryRecord.line_1_fossil_derived_base_fuel_diesel.doubleValue()) + } else { + destinationStmt.setDouble(6, 0.0) + } + + // 7. line_1_fossil_derived_base_fuel_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(7, 0.0) // No mapping + + // 8. line_2_eligible_renewable_fuel_supplied_gasoline (float8) NOT NULL + destinationStmt.setDouble(8, 0.0) // No mapping + + // 9. line_2_eligible_renewable_fuel_supplied_diesel (float8) NOT NULL + destinationStmt.setDouble(9, 0.0) // No mapping + + // 10. line_2_eligible_renewable_fuel_supplied_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(10, 0.0) // No mapping + + // 11. line_3_total_tracked_fuel_supplied_gasoline (float8) NOT NULL + destinationStmt.setDouble(11, 0.0) // No mapping + + // 12. line_3_total_tracked_fuel_supplied_diesel (float8) NOT NULL + destinationStmt.setDouble(12, 0.0) // No mapping + + // 13. line_3_total_tracked_fuel_supplied_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(13, 0.0) // No mapping + + // 14. line_4_eligible_renewable_fuel_required_gasoline (float8) NOT NULL + destinationStmt.setDouble(14, 0.0) // No mapping + + // 15. line_4_eligible_renewable_fuel_required_diesel (float8) NOT NULL + destinationStmt.setDouble(15, 0.0) // No mapping + + // 16. line_4_eligible_renewable_fuel_required_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(16, 0.0) // No mapping + + // 17. line_5_net_notionally_transferred_gasoline (float8) NOT NULL + destinationStmt.setDouble(17, 0.0) // No mapping + + // 18. line_5_net_notionally_transferred_diesel (float8) NOT NULL + destinationStmt.setDouble(18, 0.0) // No mapping + + // 19. line_5_net_notionally_transferred_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(19, 0.0) // No mapping + + // 20. line_6_renewable_fuel_retained_gasoline (float8) NOT NULL + if (summaryRecord.line_6_renewable_fuel_retained_gasoline != null) { + destinationStmt.setDouble(20, summaryRecord.line_6_renewable_fuel_retained_gasoline.doubleValue()) + } else { + destinationStmt.setDouble(20, 0.0) + } + + // 21. line_6_renewable_fuel_retained_diesel (float8) NOT NULL + if (summaryRecord.line_6_renewable_fuel_retained_diesel != null) { + destinationStmt.setDouble(21, summaryRecord.line_6_renewable_fuel_retained_diesel.doubleValue()) + } else { + destinationStmt.setDouble(21, 0.0) + } + + // 22. line_6_renewable_fuel_retained_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(22, 0.0) // No mapping + + // 23. line_7_previously_retained_gasoline (float8) NOT NULL + if (summaryRecord.line_7_previously_retained_gasoline != null) { + destinationStmt.setDouble(23, summaryRecord.line_7_previously_retained_gasoline.doubleValue()) + } else { + destinationStmt.setDouble(23, 0.0) + } + + // 24. line_7_previously_retained_diesel (float8) NOT NULL + if (summaryRecord.line_7_previously_retained_diesel != null) { + destinationStmt.setDouble(24, summaryRecord.line_7_previously_retained_diesel.doubleValue()) + } else { + destinationStmt.setDouble(24, 0.0) + } + + // 25. line_7_previously_retained_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(25, 0.0) // No mapping + + // 26. line_8_obligation_deferred_gasoline (float8) NOT NULL + if (summaryRecord.line_8_obligation_deferred_gasoline != null) { + destinationStmt.setDouble(26, summaryRecord.line_8_obligation_deferred_gasoline.doubleValue()) + } else { + destinationStmt.setDouble(26, 0.0) + } + + // 27. line_8_obligation_deferred_diesel (float8) NOT NULL + if (summaryRecord.line_8_obligation_deferred_diesel != null) { + destinationStmt.setDouble(27, summaryRecord.line_8_obligation_deferred_diesel.doubleValue()) + } else { + destinationStmt.setDouble(27, 0.0) + } + + // 28. line_8_obligation_deferred_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(28, 0.0) // No mapping + + // 29. line_9_obligation_added_gasoline (float8) NOT NULL + if (summaryRecord.line_9_obligation_added_gasoline != null) { + destinationStmt.setDouble(29, summaryRecord.line_9_obligation_added_gasoline.doubleValue()) + } else { + destinationStmt.setDouble(29, 0.0) + } + + // 30. line_9_obligation_added_diesel (float8) NOT NULL + if (summaryRecord.line_9_obligation_added_diesel != null) { + destinationStmt.setDouble(30, summaryRecord.line_9_obligation_added_diesel.doubleValue()) + } else { + destinationStmt.setDouble(30, 0.0) + } + + // 31. line_9_obligation_added_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(31, 0.0) // No mapping + + // 32. line_10_net_renewable_fuel_supplied_gasoline (float8) NOT NULL + destinationStmt.setDouble(32, 0.0) // No mapping + + // 33. line_10_net_renewable_fuel_supplied_diesel (float8) NOT NULL + destinationStmt.setDouble(33, 0.0) // No mapping + + // 34. line_10_net_renewable_fuel_supplied_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(34, 0.0) // No mapping + + // 35. line_11_non_compliance_penalty_gasoline (float8) + destinationStmt.setNull(35, java.sql.Types.FLOAT) // No mapping + + // 36. line_11_non_compliance_penalty_diesel (float8) + destinationStmt.setNull(36, java.sql.Types.FLOAT) // No mapping + + // 37. line_11_non_compliance_penalty_jet_fuel (float8) + destinationStmt.setNull(37, java.sql.Types.FLOAT) // No mapping + + // 38. line_12_low_carbon_fuel_required (float8) NOT NULL + destinationStmt.setDouble(38, 0.0) // No mapping + + // 39. line_13_low_carbon_fuel_supplied (float8) NOT NULL + destinationStmt.setDouble(39, 0.0) // No mapping + + // 40. line_14_low_carbon_fuel_surplus (float8) NOT NULL + destinationStmt.setDouble(40, 0.0) // No mapping + + // 41. line_15_banked_units_used (float8) NOT NULL + destinationStmt.setDouble(41, 0.0) // No mapping + + // 42. line_16_banked_units_remaining (float8) NOT NULL + destinationStmt.setDouble(42, 0.0) // No mapping + + // 43. line_17_non_banked_units_used (float8) NOT NULL + destinationStmt.setDouble(43, 0.0) // No mapping + + // 44. line_18_units_to_be_banked (float8) NOT NULL + destinationStmt.setDouble(44, 0.0) // No mapping + + // 45. line_19_units_to_be_exported (float8) NOT NULL + destinationStmt.setDouble(45, 0.0) // No mapping + + // 46. line_20_surplus_deficit_units (float8) NOT NULL + destinationStmt.setDouble(46, 0.0) // No mapping + + // 47. line_21_surplus_deficit_ratio (float8) NOT NULL + destinationStmt.setDouble(47, 0.0) // No mapping + + // 48. line_22_compliance_units_issued (float8) NOT NULL + destinationStmt.setDouble(48, summary.line_22_compliance_units_issued) + + // 49. line_11_fossil_derived_base_fuel_gasoline (float8) NOT NULL + destinationStmt.setDouble(49, 0.0) // No mapping + + // 50. line_11_fossil_derived_base_fuel_diesel (float8) NOT NULL + destinationStmt.setDouble(50, 0.0) // No mapping + + // 51. line_11_fossil_derived_base_fuel_jet_fuel (float8) NOT NULL + destinationStmt.setDouble(51, 0.0) // No mapping + + // 52. line_11_fossil_derived_base_fuel_total (float8) NOT NULL + destinationStmt.setDouble(52, 0.0) // No mapping + + // 53. line_21_non_compliance_penalty_payable (float8) NOT NULL + destinationStmt.setDouble(53, 0.0) // No mapping + + // 54. total_non_compliance_penalty_payable (float8) NOT NULL + destinationStmt.setDouble(54, 0.0) // No mapping + + // 55. credits_offset_a (int4) + destinationStmt.setInt(55, summaryRecord.credits_offset_a) + + // 56. credits_offset_b (int4) + destinationStmt.setInt(56, summaryRecord.credits_offset_b) + + // 57. credits_offset_c (int4) + destinationStmt.setInt(57, summaryRecord.credits_offset_c) + + // 58. create_date (timestamptz) + destinationStmt.setTimestamp(58, summaryRecord.create_date) + + // 59. update_date (timestamptz) + destinationStmt.setTimestamp(59, summaryRecord.update_date) + + // 60. create_user (varchar) + destinationStmt.setString(60, summaryRecord.create_user) + + // 61. update_user (varchar) + destinationStmt.setString(61, summaryRecord.update_user) + + // Add to batch + destinationStmt.addBatch() + totalInserted++ + + } catch (Exception e) { + log.error("Failed to insert summary_record for compliance_report_id: ${summaryRecord.compliance_report_id}", e) + totalSkipped++ + // Continue processing other records + continue + } + } + + // ========================================= + // Execute Batch and Commit + // ========================================= + + try { + destinationStmt.executeBatch() + destinationConn.commit() + log.info("Successfully inserted ${totalInserted} records into destination compliance_report_summary.") + if (totalSkipped > 0) { + log.warn("Skipped ${totalSkipped} records due to insertion errors.") + } + } catch (Exception e) { + log.error("Batch insertion failed. Rolling back.", e) + destinationConn.rollback() + } finally { + // ========================================= + // Cleanup Resources + // ========================================= + if (rs != null && !rs.isClosed()) rs.close() + if (sourceStmt != null && !sourceStmt.isClosed()) sourceStmt.close() + if (destinationStmt != null && !destinationStmt.isClosed()) destinationStmt.close() + if (sourceConn != null && !sourceConn.isClosed()) sourceConn.close() + if (destinationConn != null && !destinationConn.isClosed()) destinationConn.close() + } + +} catch (Exception e) { + log.error("An error occurred during the ETL process.", e) + // Ensure connections are closed in case of unexpected errors + if (sourceConn != null && !sourceConn.isClosed()) sourceConn.close() + if (destinationConn != null && !destinationConn.isClosed()) destinationConn.close() + throw e +} diff --git a/etl/nifi_scripts/fuel_code.groovy b/etl/nifi_scripts/fuel_code.groovy index 57de4f16f..11aa7017f 100644 --- a/etl/nifi_scripts/fuel_code.groovy +++ b/etl/nifi_scripts/fuel_code.groovy @@ -1,80 +1,321 @@ -import org.apache.nifi.processor.io.StreamCallback +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet import groovy.json.JsonSlurper -import groovy.json.JsonOutput - -def transformCallback = { inputStream, outputStream -> - try { - // Parse JSON input - def record = new JsonSlurper().parseText(inputStream.text) - - // Map the fuel_id to the corresponding new value - def fuelIdMapping = [ - 8 : 13, // Propane - 21 : 15, // Renewable naphtha - 10 : 14, // Renewable gasoline - 19 : 16, // Fossil-derived diesel - 11 : 17, // Fossil-derived gasoline - 20 : 17 // Fossil-derived gasoline - ] - - // Replace fuel_id if it matches one of the keys in the map - if (fuelIdMapping.containsKey(record.fuel_id)) { - record.fuel_id = fuelIdMapping[record.fuel_id] + +log.warn('**** STARTING FUEL CODE ETL ****') + +def fuelIdMapping = [ + 8 : 13, + 9 : 5, // TODO: need to double check this + 10 : 14, + 11 : 17, + 19 : 16, + 20 : 17, + 21 : 15, +] + +def transportModeMapping = [ + 1 : 6, + 3 : 7 +] + +def provinceStateMap = [ + // Canadian Provinces + 'BC': 'British Columbia', + 'AB': 'Alberta', + 'SK': 'Saskatchewan', + 'MB': 'Manitoba', + 'ON': 'Ontario', + 'QC': 'Quebec', + 'NL': 'Newfoundland and Labrador', + 'BRITISH COLUMBIA': 'British Columbia', + 'ALBERTA': 'Alberta', + 'SASKATCHEWAN': 'Saskatchewan', + 'MANITOBA': 'Manitoba', + 'ONTARIO': 'Ontario', + 'QUEBEC': 'Quebec', + 'NEWFOUNDLAND': 'Newfoundland and Labrador', + + // US States + 'CA': 'California', + 'CT': 'Connecticut', + 'GA': 'Georgia', + 'IA': 'Iowa', + 'IL': 'Illinois', + 'IN': 'Indiana', + 'KS': 'Kansas', + 'LA': 'Louisiana', + 'MN': 'Minnesota', + 'MS': 'Mississippi', + 'MO': 'Missouri', + 'MT': 'Montana', + 'ND': 'North Dakota', + 'NE': 'Nebraska', + 'NM': 'New Mexico', + 'OH': 'Ohio', + 'OK': 'Oklahoma', + 'OR': 'Oregon', + 'SD': 'South Dakota', + 'TX': 'Texas', + 'WA': 'Washington', + 'WI': 'Wisconsin', + 'WY': 'Wyoming', + 'CALIFORNIA': 'California', + 'GEORGIA': 'Georgia', + 'IOWA': 'Iowa', + 'ILLINOIS': 'Illinois', + 'INDIANA': 'Indiana', + 'KANSAS': 'Kansas', + 'LOUISIANA': 'Louisiana', + 'MINNESOTA': 'Minnesota', + 'MISSISSIPPI': 'Mississippi', + 'MISSOURI': 'Missouri', + 'MONTANA': 'Montana', + 'NORTH DAKOTA': 'North Dakota', + 'NEBRASKA': 'Nebraska', + 'NEW MEXICO': 'New Mexico', + 'OHIO': 'Ohio', + 'OKLAHOMA': 'Oklahoma', + 'OREGON': 'Oregon', + 'SOUTH DAKOTA': 'South Dakota', + 'TEXAS': 'Texas', + 'WASHINGTON': 'Washington', + 'WISCONSIN': 'Wisconsin', + 'WYOMING': 'Wyoming', + 'WYOMING.': 'Wyoming', // Add variant with period + 'CONNETICUT': 'Connecticut', // Add misspelled variant + 'CONNECTICUT': 'Connecticut', +] + +def fuelCodeQuery = ''' + SELECT + fc.*, + array_agg(DISTINCT finishedFtm.transport_mode_id) AS finished_fuel_transport_modes, + array_agg(DISTINCT feedstockFtf.transport_mode_id) AS feedstock_fuel_transport_modes + FROM + fuel_code fc + LEFT JOIN + fuel_transport_mode_fuel_code finishedFtm ON fc.id = finishedFtm.fuel_code_id + LEFT JOIN + feedstock_transport_mode_fuel_code feedstockFtf ON fc.id = feedstockFtf.fuel_code_id + GROUP BY + fc.id; +''' + +// Insert `fuel_code` into the target database +def insertFuelCodeSQL = ''' + INSERT INTO fuel_code ( + fuel_status_id, + fuel_suffix, + company, + carbon_intensity, + last_updated, + application_date, + approval_date, + fuel_type_id, + feedstock, + feedstock_location, + feedstock_misc, + facility_nameplate_capacity, + former_company, + create_date, + update_date, + effective_date, + expiration_date, + fuel_production_facility_city, + fuel_production_facility_province_state, + fuel_production_facility_country, + prefix_id, + edrms, + notes, + create_user, + update_user, + effective_status, + contact_name, + contact_email + ) + VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + 1, + '', + null, + null, + null, + true, + null, + null + ) + RETURNING fuel_code_id; +''' + +// Insert into `finished_fuel_transport_mode` +def insertFinishedFuelTransportModeSQL = ''' + INSERT INTO finished_fuel_transport_mode (fuel_code_id, transport_mode_id) + VALUES (?, ?); +''' + +// Insert into `feedstock_fuel_transport_mode` +def insertFeedstockFuelTransportModeSQL = ''' + INSERT INTO feedstock_fuel_transport_mode (fuel_code_id, transport_mode_id) + VALUES (?, ?); +''' + +// Fetch connections to both source and destination databases +def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') +def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') + +sourceConn = null +destinationConn = null + +try { + // Get connections + sourceConn = sourceDbcpService.getConnection() + destinationConn = destinationDbcpService.getConnection() + + fetchFuelCodes = sourceConn.prepareStatement(fuelCodeQuery) + fuelCodes = fetchFuelCodes.executeQuery() + + while (fuelCodes.next()) { + def fuelCodeVersion = fuelCodes.getString('fuel_code_version') + def fuelCodeVersionMinor = fuelCodes.getString('fuel_code_version_minor') + + def fuelStatusId = fuelCodes.getInt('status_id') + def fuelSuffix = "${fuelCodeVersion}.${fuelCodeVersionMinor}" + def company = fuelCodes.getString('company') + def carbonIntensity = fuelCodes.getDouble('carbon_intensity') + def lastUpdated = fuelCodes.getDate('update_timestamp') + def applicationDate = fuelCodes.getDate('application_date') + def approvalDate = fuelCodes.getDate('approval_date') + def fuelTypeId = fuelCodes.getInt('fuel_id') + def feedstock = fuelCodes.getString('feedstock') + def feedstockLocation = fuelCodes.getString('feedstock_location') + def feedstockMisc = fuelCodes.getString('feedstock_misc') + def facilityNameplateCapacity = fuelCodes.getDouble('facility_nameplate') + def formerCompany = fuelCodes.getString('former_company') + def createDate = fuelCodes.getDate('create_timestamp') + def updateDate = fuelCodes.getDate('update_timestamp') + def effectiveDate = fuelCodes.getDate('effective_date') + def expirationDate = fuelCodes.getDate('expiry_date') + + def facilityLocation = fuelCodes.getString('facility_location') + + def locationParts = (facilityLocation ?: '').split(',').collect { it.trim() } + def facilityCity = null + def facilityProvinceState = null + def facilityCountry = null + + if (locationParts.size() == 1) { + def location = locationParts[0].toUpperCase() + if (location == 'US CENTRAL') { + facilityCountry = 'United States of America' + } else if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } + } else if (locationParts.size() == 2) { + // First part is always city + facilityCity = locationParts[0] + + // Process second part - could be province/state or country + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + // If not a recognized province/state, treat as country + facilityCountry = locationParts[1] + } + } else if (locationParts.size() == 3) { + // First part is always city + facilityCity = locationParts[0] + + // Second part is province/state + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + facilityProvinceState = locationParts[1] + } + + // Third part is always country - expand USA to full name + def country = locationParts[2].toUpperCase() + if (country == 'USA') { + facilityCountry = 'United States of America' + } else { + facilityCountry = locationParts[2] + } } - // Map the fields from the source to the target schema - // The following fields are not used in the migration: fuel_code_id, facility_location, renewable_percentage, facility_nameplate_capacity_unit - def transformedRecord = [ - fuel_status_id : record.status_id, - prefix_id : 1, // BCLCF - fuel_suffix : "${record.fuel_code_version}.${record.fuel_code_version_minor}", - company : record.company, - carbon_intensity : record.carbon_intensity, - edrms : "", - last_updated : record.update_timestamp, - application_date : record.application_date, - approval_date : record.approval_date, - fuel_type_id : record.fuel_id, - feedstock : record.feedstock, - feedstock_location : record.feedstock_location, - feedstock_misc : record.feedstock_misc, - facility_nameplate_capacity : record.facility_nameplate, - former_company : record.former_company, - notes : null, - create_date : record.create_timestamp, - update_date : record.update_timestamp, - create_user : null, - update_user : null, - effective_date : record.effective_date, - expiration_date : record.expiry_date, - effective_status : true, - fuel_production_facility_city : null, - fuel_production_facility_province_state: null, - fuel_production_facility_country : null, - contact_name : null, - contact_email : null - ] - - // Write the transformed data back to the output - outputStream.write(JsonOutput.toJson(transformedRecord).getBytes("UTF-8")) - - } catch (Exception e) { - def recordId = record?.id - if (recordId) { - flowFile = session.putAttribute(flowFile, "failed_record_id", recordId.toString()) + def finishedModes = fuelCodes.getArray('finished_fuel_transport_modes')?.getArray() ?: [] + def feedstockModes = fuelCodes.getArray('feedstock_fuel_transport_modes')?.getArray() ?: [] + + // Insert `fuel_code` and get the new ID + def insertFuelCode = destinationConn.prepareStatement(insertFuelCodeSQL) + insertFuelCode.setInt(1, fuelStatusId) + insertFuelCode.setString(2, fuelSuffix) + insertFuelCode.setString(3, company) + insertFuelCode.setDouble(4, carbonIntensity) + insertFuelCode.setDate(5, lastUpdated) + insertFuelCode.setDate(6, applicationDate) + insertFuelCode.setDate(7, approvalDate) + insertFuelCode.setInt(8, fuelIdMapping[fuelTypeId] ?: fuelTypeId) + insertFuelCode.setString(9, feedstock) + insertFuelCode.setString(10, feedstockLocation) + insertFuelCode.setString(11, feedstockMisc) + insertFuelCode.setDouble(12, facilityNameplateCapacity) + insertFuelCode.setString(13, formerCompany) + insertFuelCode.setDate(14, createDate) + insertFuelCode.setDate(15, updateDate) + insertFuelCode.setDate(16, effectiveDate) + insertFuelCode.setDate(17, expirationDate) + insertFuelCode.setString(18, facilityCity) + insertFuelCode.setString(19, facilityProvinceState) + insertFuelCode.setString(20, facilityCountry) + + def rs = insertFuelCode.executeQuery() + def newFuelCodeId = rs.next() ? rs.getInt('fuel_code_id') : null + rs.close() + + // Insert finished fuel transport modes + finishedModes.each { mode -> + def insertFinishedMode = destinationConn.prepareStatement(insertFinishedFuelTransportModeSQL) + insertFinishedMode.setInt(1, newFuelCodeId) + insertFinishedMode.setInt(2, transportModeMapping[mode] ?: mode) + insertFinishedMode.executeUpdate() } - throw e - } -} -// Obtain the flowFile from the session -flowFile = session.get() -if (flowFile != null) { - try { - // Write the transformed data using the transformCallback - flowFile = session.write(flowFile, transformCallback as StreamCallback) - session.transfer(flowFile, REL_SUCCESS) - } catch (Exception e) { - session.transfer(flowFile, REL_FAILURE) + // Insert feedstock fuel transport modes + feedstockModes.each { mode -> + def insertFeedstockMode = destinationConn.prepareStatement(insertFeedstockFuelTransportModeSQL) + insertFeedstockMode.setInt(1, newFuelCodeId) + insertFeedstockMode.setInt(2, transportModeMapping[mode] ?: mode) + insertFeedstockMode.executeUpdate() + } } +} catch (Exception e) { + log.error('Error occurred during ETL process', e) +} finally { + if (fetchFuelCodes) fetchFuelCodes.close() + if (sourceConn) sourceConn.close() + if (destinationConn) destinationConn.close() } + +log.warn('**** COMPLETED FUEL CODE ETL ****') diff --git a/etl/nifi_scripts/fuel_code.old.groovy b/etl/nifi_scripts/fuel_code.old.groovy new file mode 100644 index 000000000..a0fef4616 --- /dev/null +++ b/etl/nifi_scripts/fuel_code.old.groovy @@ -0,0 +1,194 @@ +import org.apache.nifi.processor.io.StreamCallback +import groovy.json.JsonSlurper +import groovy.json.JsonOutput + +def transformCallback = { inputStream, outputStream -> + try { + // Parse JSON input + def record = new JsonSlurper().parseText(inputStream.text) + + // Map the fuel_id to the corresponding new value + def fuelIdMapping = [ + 8 : 13, // Propane + 21 : 15, // Renewable naphtha + 10 : 14, // Renewable gasoline + 19 : 16, // Fossil-derived diesel + 11 : 17, // Fossil-derived gasoline + 20 : 17 // Fossil-derived gasoline + ] + + // Replace fuel_id if it matches one of the keys in the map + if (fuelIdMapping.containsKey(record.fuel_id)) { + record.fuel_id = fuelIdMapping[record.fuel_id] + } + + // Parse facility location + def locationParts = (record.facility_location ?: '').split(',').collect { it.trim() } + def facilityCity = null + def facilityProvinceState = null + def facilityCountry = null + + // Map of provinces and states to their full names (only those appearing in the data) + def provinceStateMap = [ + // Canadian Provinces + 'BC': 'British Columbia', + 'AB': 'Alberta', + 'SK': 'Saskatchewan', + 'MB': 'Manitoba', + 'ON': 'Ontario', + 'QC': 'Quebec', + 'NL': 'Newfoundland and Labrador', + 'BRITISH COLUMBIA': 'British Columbia', + 'ALBERTA': 'Alberta', + 'SASKATCHEWAN': 'Saskatchewan', + 'MANITOBA': 'Manitoba', + 'ONTARIO': 'Ontario', + 'QUEBEC': 'Quebec', + 'NEWFOUNDLAND': 'Newfoundland and Labrador', + + // US States + 'CA': 'California', + 'CT': 'Connecticut', + 'CONNETICUT': 'Connecticut', // Add misspelled variant + 'CONNECTICUT': 'Connecticut', + 'GA': 'Georgia', + 'IA': 'Iowa', + 'IL': 'Illinois', + 'IN': 'Indiana', + 'KS': 'Kansas', + 'LA': 'Louisiana', + 'MN': 'Minnesota', + 'MS': 'Mississippi', + 'MO': 'Missouri', + 'MT': 'Montana', + 'ND': 'North Dakota', + 'NE': 'Nebraska', + 'NM': 'New Mexico', + 'OH': 'Ohio', + 'OK': 'Oklahoma', + 'OR': 'Oregon', + 'SD': 'South Dakota', + 'TX': 'Texas', + 'WA': 'Washington', + 'WI': 'Wisconsin', + 'WY': 'Wyoming', + 'CALIFORNIA': 'California', + 'GEORGIA': 'Georgia', + 'IOWA': 'Iowa', + 'ILLINOIS': 'Illinois', + 'INDIANA': 'Indiana', + 'KANSAS': 'Kansas', + 'LOUISIANA': 'Louisiana', + 'MINNESOTA': 'Minnesota', + 'MISSISSIPPI': 'Mississippi', + 'MISSOURI': 'Missouri', + 'MONTANA': 'Montana', + 'NORTH DAKOTA': 'North Dakota', + 'NEBRASKA': 'Nebraska', + 'NEW MEXICO': 'New Mexico', + 'OHIO': 'Ohio', + 'OKLAHOMA': 'Oklahoma', + 'OREGON': 'Oregon', + 'SOUTH DAKOTA': 'South Dakota', + 'TEXAS': 'Texas', + 'WASHINGTON': 'Washington', + 'WISCONSIN': 'Wisconsin', + 'WYOMING': 'Wyoming', + 'WYOMING.': 'Wyoming', // Add variant with period + ] + + if (locationParts.size() == 1) { + def location = locationParts[0].toUpperCase() + if (location == 'US CENTRAL') { + facilityCountry = 'United States of America' + } else if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } + } else if (locationParts.size() == 2) { + // First part is always city + facilityCity = locationParts[0] + + // Process second part - could be province/state or country + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + // If not a recognized province/state, treat as country + facilityCountry = locationParts[1] + } + } else if (locationParts.size() == 3) { + // First part is always city + facilityCity = locationParts[0] + + // Second part is province/state + def location = locationParts[1].toUpperCase() + if (provinceStateMap.containsKey(location)) { + facilityProvinceState = provinceStateMap[location] + } else { + facilityProvinceState = locationParts[1] + } + + // Third part is always country - expand USA to full name + def country = locationParts[2].toUpperCase() + if (country == 'USA') { + facilityCountry = 'United States of America' + } else { + facilityCountry = locationParts[2] + } + } + + // Map the fields from the source to the target schema + // The following fields are not used in the migration: fuel_code_id, facility_location, renewable_percentage, facility_nameplate_capacity_unit + def transformedRecord = [ + fuel_status_id : record.status_id, + prefix_id : 1, // BCLCF + fuel_suffix : "${record.fuel_code_version}.${record.fuel_code_version_minor}", + company : record.company, + carbon_intensity : record.carbon_intensity, + edrms : '', + last_updated : record.update_timestamp, + application_date : record.application_date, + approval_date : record.approval_date, + fuel_type_id : record.fuel_id, + feedstock : record.feedstock, + feedstock_location : record.feedstock_location, + feedstock_misc : record.feedstock_misc, + facility_nameplate_capacity : record.facility_nameplate, + former_company : record.former_company, + notes : null, + create_date : record.create_timestamp, + update_date : record.update_timestamp, + create_user : null, + update_user : null, + effective_date : record.effective_date, + expiration_date : record.expiry_date, + effective_status : true, + fuel_production_facility_city : facilityCity, + fuel_production_facility_province_state: facilityProvinceState, + fuel_production_facility_country : facilityCountry, + contact_name : null, + contact_email : null + ] + + // Write the transformed data back to the output + outputStream.write(JsonOutput.toJson(transformedRecord).getBytes('UTF-8')) + } catch (Exception e) { + def recordId = record?.id + if (recordId) { + flowFile = session.putAttribute(flowFile, 'failed_record_id', recordId.toString()) + } + throw e + } +} + +// Obtain the flowFile from the session +flowFile = session.get() +if (flowFile != null) { + try { + // Write the transformed data using the transformCallback + flowFile = session.write(flowFile, transformCallback as StreamCallback) + session.transfer(flowFile, REL_SUCCESS) + } catch (Exception e) { + session.transfer(flowFile, REL_FAILURE) + } +} diff --git a/etl/readme.md b/etl/readme.md index 37307864d..76d7bb7c5 100755 --- a/etl/readme.md +++ b/etl/readme.md @@ -1,5 +1,7 @@ # ETL Overview -This project sets up Apache NiFi along with two PostgreSQL databases, TFRS and LCFS, using Docker. It enables data migration between these databases via NiFi. + +This project sets up Apache NiFi along with two PostgreSQL databases, TFRS and LCFS, using Docker. It enables data +migration between these databases via NiFi. ## How to Use @@ -9,12 +11,11 @@ This project sets up Apache NiFi along with two PostgreSQL databases, TFRS and L $ docker-compose up -d ``` -Starts three containers: NiFi, TFRS, and LCFS databases. +Starts three containers: NiFi, TFRS Database, and Zookeeper. ## 2. Access NiFi: -Go to http://localhost:8080/nifi/ - +Go to http://localhost:8091/nifi/ ## 3. Load NiFi Template: @@ -33,7 +34,7 @@ Drag the template onto the canvas. ## 5. Enable Services: -Click the lignting bolt next to all services to Enable them +Click the lightning bolt next to all services to Enable them ## 6. Data transfer between OpenShift and local containers: @@ -67,19 +68,24 @@ Click the Start icon to begin the data flow. To monitor your NiFi data flow: ## 1. View Flow Status: - - Check each processor and connection for real-time data on processed, queued, or penalized FlowFiles. - - Click on components for detailed stats and performance metrics. + +- Check each processor and connection for real-time data on processed, queued, or penalized FlowFiles. +- Click on components for detailed stats and performance metrics. ## 2. Enable Bulletins: - - Configure bulletins to receive alerts (INFO, WARN, ERROR) for any issues that require attention. + +- Configure bulletins to receive alerts (INFO, WARN, ERROR) for any issues that require attention. ## 3. Use Data Provenance: - - Track the lineage of each FlowFile to see its origin, processing steps, and final destination. + +- Track the lineage of each FlowFile to see its origin, processing steps, and final destination. ## 4. Monitor System Health: - - Use the ## Summary## tab to check overall system health, including memory usage, thread activity, and performance. + +- Use the ## Summary## tab to check overall system health, including memory usage, thread activity, and performance. ## Error Handling + If any records cannot be added to the databases, they will be logged and stored in the `nifi_output` directory. You can access these failed records for further inspection or troubleshooting. \ No newline at end of file diff --git a/frontend/Dockerfile.openshift b/frontend/Dockerfile.openshift index 9cc584f19..7801a3e46 100644 --- a/frontend/Dockerfile.openshift +++ b/frontend/Dockerfile.openshift @@ -1,4 +1,4 @@ -FROM artifacts.developer.gov.bc.ca/docker-remote/node:20 as builder +FROM artifacts.developer.gov.bc.ca/docker-remote/node:20.18.1 as builder ENV NODE_ENV=production WORKDIR /usr/src/app COPY ./ ./ diff --git a/frontend/src/assets/locales/en/dashboard.json b/frontend/src/assets/locales/en/dashboard.json index 66527f31e..20ce49790 100644 --- a/frontend/src/assets/locales/en/dashboard.json +++ b/frontend/src/assets/locales/en/dashboard.json @@ -26,6 +26,12 @@ "viewAllTransactions": "View all transactions", "loadingMessage": "Loading transactions card..." }, + "fuelCodes": { + "title": "Fuel Codes", + "thereAre": "There are:", + "fcInProgress": "Fuel Code(s) in progress", + "loadingMessage": "Loading fuel codes card..." + }, "complianceReports": { "title": "Compliance reports", "thereAre": "There are:", diff --git a/frontend/src/constants/routes/apiRoutes.js b/frontend/src/constants/routes/apiRoutes.js index f2cd2a5b4..cf5de482e 100644 --- a/frontend/src/constants/routes/apiRoutes.js +++ b/frontend/src/constants/routes/apiRoutes.js @@ -66,6 +66,7 @@ export const apiRoutes = { allocationAgreementSearch: '/allocation-agreement/search?', OrgComplianceReportCounts: '/dashboard/org-compliance-report-counts', complianceReportCounts: '/dashboard/compliance-report-counts', + fuelCodeCounts: '/dashboard/fuel-code-counts', organizationSearch: '/organizations/search?', getUserActivities: '/users/:userID/activity', getAllUserActivities: '/users/activities/all', diff --git a/frontend/src/hooks/useDashboard.js b/frontend/src/hooks/useDashboard.js index 6a60a94eb..aefed6278 100644 --- a/frontend/src/hooks/useDashboard.js +++ b/frontend/src/hooks/useDashboard.js @@ -70,3 +70,15 @@ export const useComplianceReportCounts = () => { } }) } +export const useFuelCodeCounts = () => { + const client = useApiService() + const path = apiRoutes.fuelCodeCounts + + return useQuery({ + queryKey: ['fuel-code-counts'], + queryFn: async () => { + const response = await client.get(path) + return response.data + } + }) +} diff --git a/frontend/src/utils/keycloak.js b/frontend/src/utils/keycloak.js index f4f9d586c..2dc0364ff 100644 --- a/frontend/src/utils/keycloak.js +++ b/frontend/src/utils/keycloak.js @@ -18,7 +18,11 @@ export const getKeycloak = () => { export const logout = () => { sessionStorage.removeItem('keycloak-logged-in') - + const idToken = keycloak.idToken || keycloak.tokenParsed?.idToken; + if (!idToken) { + console.error('idToken is not available'); + return; + } const keycloakLogoutUrl = keycloak.endpoints.logout() + '?post_logout_redirect_uri=' + @@ -26,7 +30,7 @@ export const logout = () => { '&client_id=' + keycloak.clientId + '&id_token_hint=' + - keycloak.idToken + idToken const url = CONFIG.KEYCLOAK.SM_LOGOUT_URL + encodeURIComponent(keycloakLogoutUrl) diff --git a/frontend/src/views/Dashboard/Dashboard.jsx b/frontend/src/views/Dashboard/Dashboard.jsx index 91991a57d..16e613353 100644 --- a/frontend/src/views/Dashboard/Dashboard.jsx +++ b/frontend/src/views/Dashboard/Dashboard.jsx @@ -19,6 +19,7 @@ import { OrgUserSettingsCard } from './components/cards' import OrganizationsSummaryCard from './components/cards/idir/OrganizationsSummaryCard' +import { FuelCodeCard } from './components/cards/idir/FuelCodeCard' import { ComplianceReportCard } from './components/cards/idir/ComplianceReportCard' export const Dashboard = () => { @@ -68,10 +69,16 @@ export const Dashboard = () => { - - - + + + + + + + + + diff --git a/frontend/src/views/Dashboard/components/cards/idir/ComplianceReportCard.jsx b/frontend/src/views/Dashboard/components/cards/idir/ComplianceReportCard.jsx index 2f1b9b610..f0fb472dd 100644 --- a/frontend/src/views/Dashboard/components/cards/idir/ComplianceReportCard.jsx +++ b/frontend/src/views/Dashboard/components/cards/idir/ComplianceReportCard.jsx @@ -24,7 +24,6 @@ export const ComplianceReportCard = () => { const { t } = useTranslation(['dashboard']) const navigate = useNavigate() const { data: counts, isLoading } = useComplianceReportCounts() - console.log('ComplianceReportCard counts:', counts) const handleNavigation = () => { navigate(ROUTES.REPORTS, { diff --git a/frontend/src/views/Dashboard/components/cards/idir/FuelCodeCard.jsx b/frontend/src/views/Dashboard/components/cards/idir/FuelCodeCard.jsx new file mode 100644 index 000000000..6d6fad491 --- /dev/null +++ b/frontend/src/views/Dashboard/components/cards/idir/FuelCodeCard.jsx @@ -0,0 +1,99 @@ +import BCTypography from '@/components/BCTypography' +import BCWidgetCard from '@/components/BCWidgetCard/BCWidgetCard' +import Loading from '@/components/Loading' +import { ROUTES } from '@/constants/routes' +import { useFuelCodeCounts } from '@/hooks/useDashboard' +import { List, ListItemButton, Stack } from '@mui/material' +import { useTranslation } from 'react-i18next' +import { useNavigate } from 'react-router-dom' + +const CountDisplay = ({ count }) => ( + + {count} + +) + +export const FuelCodeCard = () => { + const { t } = useTranslation(['dashboard']) + const navigate = useNavigate() + const { data: counts, isLoading } = useFuelCodeCounts() + + const handleNavigation = () => { + navigate(ROUTES.FUELCODES, { + state: { + filters: [ + { + field: 'status', + filter: 'Draft', + filterType: 'text', + type: 'equals' + } + ] + } + }) + } + + const renderLinkWithCount = (text, count, onClick) => { + return ( + <> + {count != null && } + + {text} + + + ) + } + + return ( + + ) : ( + + + {t('dashboard:fuelCodes.thereAre')} + + + + {renderLinkWithCount( + t('dashboard:fuelCodes.fcInProgress'), + counts?.draftFuelCodes || 0, + handleNavigation + )} + + + + ) + } + /> + ) +} diff --git a/frontend/src/views/FuelCodes/FuelCodes.jsx b/frontend/src/views/FuelCodes/FuelCodes.jsx index ca6949df4..1db1bf5d4 100644 --- a/frontend/src/views/FuelCodes/FuelCodes.jsx +++ b/frontend/src/views/FuelCodes/FuelCodes.jsx @@ -141,6 +141,7 @@ const FuelCodesBase = () => { getRowId={getRowId} overlayNoRowsTemplate={t('fuelCode:noFuelCodesFound')} defaultColDef={defaultColDef} + defaultFilterModel={location.state?.filters} onSetResetGrid={handleSetResetGrid} />