From 1a04d61ca8be35f98e3b6f87bb96b3b833640bda Mon Sep 17 00:00:00 2001 From: Jeremy Traini Date: Sun, 12 Mar 2023 20:48:52 +1100 Subject: [PATCH 1/2] Testing and implementation of report database query endpoints (#17) * Moved validation docs outside functions to be parsed only once * Adding tests for change name, delete, list all and list by * Adding server calls for new report endpoints * Fixing report tests + implementation of list_all endpoint * Fixing list by test to better test functionality * Implementation of list_by endpoint * Fixes to change_name tests * Implementation of change name endpoint * Implementation of report/delete endpoint * Adding tests for check validity endpoint * Implementation of check validity endpoint * Small changes to bulk upload tests * Implementation of bulk upload passing tests * Updating tests to work with server * Initial implementation of bulk_export endpoint * Fixing file upload endpoint * Fixing bulk export implementation --------- Co-authored-by: Mohamad Mohamad --- README.md | 2 +- src/classes/Sample.py | 4 - src/constants.py | 9 ++ src/database.py | 4 +- src/export.py | 9 ++ src/generation.py | 148 +++++++++++------------- src/helpers.py | 1 + src/invoice.py | 19 ++- src/report.py | 86 +++++--------- src/server.py | 47 ++++---- src/setup_database.py | 17 --- src/type_structure.py | 6 +- {src/classes => tests/bulk}/__init__.py | 0 tests/bulk/bulk_export_test.py | 37 ++++++ tests/bulk/bulk_upload_test.py | 36 ++++++ tests/report/__init__.py | 0 tests/report/change_name_test.py | 26 +++++ tests/report/check_validity_test.py | 20 ++++ tests/report/delete_test.py | 17 +++ tests/report/list_all_test.py | 39 +++++++ tests/report/list_by_test.py | 27 +++++ tests/server_calls.py | 50 ++++++++ 22 files changed, 401 insertions(+), 203 deletions(-) delete mode 100644 src/classes/Sample.py create mode 100644 src/constants.py delete mode 100644 src/setup_database.py rename {src/classes => tests/bulk}/__init__.py (100%) create mode 100644 tests/bulk/bulk_export_test.py create mode 100644 tests/bulk/bulk_upload_test.py create mode 100644 tests/report/__init__.py create mode 100644 tests/report/change_name_test.py create mode 100644 tests/report/check_validity_test.py create mode 100644 tests/report/delete_test.py create mode 100644 tests/report/list_all_test.py create mode 100644 tests/report/list_by_test.py diff --git a/README.md b/README.md index 29e5e60..aa979a2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# se2021-23t1-einvoicing-api-template +# SENG2021 23T1 CHURROS E-Invoicing Validation API In order to run the server, we must diff --git a/src/classes/Sample.py b/src/classes/Sample.py deleted file mode 100644 index d7f6b1a..0000000 --- a/src/classes/Sample.py +++ /dev/null @@ -1,4 +0,0 @@ - -class Sample: - def __init__(self, val) -> None: - self._val = val diff --git a/src/constants.py b/src/constants.py new file mode 100644 index 0000000..6baa4a6 --- /dev/null +++ b/src/constants.py @@ -0,0 +1,9 @@ +from lxml import etree +from saxonche import PySaxonProcessor + +# Parse the XSD file +XSD_SCHEMA = etree.XMLSchema(etree.parse("src/xsd/maindoc/UBL-Invoice-2.1.xsd", parser=None)) +proc = PySaxonProcessor(license=False) +xsltproc = proc.new_xslt30_processor() +SYNTAX_EXECUTABLE = xsltproc.compile_stylesheet(stylesheet_file="src/validation_artefacts/AUNZ-UBL-validation.xslt") +PEPPOL_EXECUTABLE = xsltproc.compile_stylesheet(stylesheet_file="src/validation_artefacts/AUNZ-PEPPOL-validation.xslt") diff --git a/src/database.py b/src/database.py index d741161..11cda30 100644 --- a/src/database.py +++ b/src/database.py @@ -24,7 +24,7 @@ class Evaluations(BaseModel): num_rules_failed = IntegerField() def to_json(self): - violations = Violations.select().where(Violations.evaluation == self.id) + violations = Violations.select().where(Violations.evaluation == self.id) # type: ignore return { "is_valid": self.is_valid, @@ -49,7 +49,7 @@ class Reports(BaseModel): def to_json(self): return { - "report_id": self.id, + "report_id": self.id, # type: ignore "date_generated": self.date_generated, "invoice_name": self.invoice_name, "invoice_text": self.invoice_text, diff --git a/src/export.py b/src/export.py index e511100..024d97e 100644 --- a/src/export.py +++ b/src/export.py @@ -162,3 +162,12 @@ def export_csv_report_v1(report_id: int): return csv_contents +def report_bulk_export_v1(report_ids, report_format) -> List: + report_format = report_format.lower() + print("Exporting reports") + if report_format == "json": + return [export_json_report_v1(report_id) for report_id in report_ids] + elif report_format == "html": + return [export_html_report_v1(report_id) for report_id in report_ids] + else: + raise Exception("Unknown report format") diff --git a/src/generation.py b/src/generation.py index 698b665..b4f9fbf 100644 --- a/src/generation.py +++ b/src/generation.py @@ -1,4 +1,5 @@ from src.type_structure import * +from src.constants import XSD_SCHEMA, SYNTAX_EXECUTABLE, PEPPOL_EXECUTABLE from lxml import etree from typing import Dict from saxonche import PySaxonProcessor @@ -21,7 +22,7 @@ def generate_wellformedness_evaluation(invoice_text: str) -> Evaluations: violations = [] try: - etree.fromstring(invoice_text.encode("utf-8")) + etree.fromstring(invoice_text.encode("utf-8"), parser=None) except etree.XMLSyntaxError as error: evaluation.is_valid = False evaluation.num_errors = 1 @@ -51,21 +52,17 @@ def generate_schema_evaluation(invoice_text: str) -> Evaluations: num_errors=0, num_rules_failed=0 ) - - # Parse the XSD file - xsd_doc = etree.parse("src/xsd/maindoc/UBL-Invoice-2.1.xsd") - xsd = etree.XMLSchema(xsd_doc) # Parse the XML data - xml_doc = etree.fromstring(invoice_text.encode("utf-8")) + xml_doc = etree.fromstring(invoice_text.encode("utf-8"), parser=None) violations = [] # Validate the XML against the XSD schema - if not xsd.validate(xml_doc): + if not XSD_SCHEMA.validate(xml_doc): evaluation.is_valid = False - for error in xsd.error_log: + for error in XSD_SCHEMA.error_log: evaluation.num_errors += 1 evaluation.num_rules_failed += 1 @@ -93,77 +90,69 @@ def generate_peppol_evaluation(invoice_text: str) -> Evaluations: return generate_xslt_evaluation("peppol", invoice_text) def generate_xslt_evaluation(aspect, invoice_text) -> Evaluations: - with PySaxonProcessor(license=False) as proc: - xsltproc = proc.new_xslt30_processor() - - if aspect == "syntax": - xslt_path = "src/validation_artefacts/AUNZ-UBL-validation.xslt" - else: - xslt_path = "src/validation_artefacts/AUNZ-PEPPOL-validation.xslt" - - executable = xsltproc.compile_stylesheet(stylesheet_file=xslt_path) - - if xsltproc.exception_occurred: - raise Exception("XSLT failed to load! " + xsltproc.error_message) - - tmp_filename = create_temp_file(invoice_text) - schematron_output = executable.transform_to_value(source_file=tmp_filename) - unlink(tmp_filename) - - if not schematron_output: - raise Exception("Could not generate evaluation due to bad XML!") - - violations = [] - - num_warnings = 0 - num_errors = 0 - rules_failed = set() - - output = schematron_output.item_at(0).get_node_value().children[0].children - - for item in output: - if item.name and item.name.endswith("failed-assert"): - id_name = item.get_attribute_value("id") - rules_failed.add(id_name) - is_fatal = item.get_attribute_value("flag") == "fatal" - - if is_fatal: - num_errors += 1 - else: - num_warnings += 1 - - xpath = item.get_attribute_value("location") - test = item.get_attribute_value("test") - - message = "" - suggestion = "" - if item.children: - message = item.children[0].string_value - - if len(item.children) > 1: - suggestion = item.children[1].string_value + if (aspect == "syntax"): + executable = SYNTAX_EXECUTABLE + else: + executable = PEPPOL_EXECUTABLE + + tmp_filename = create_temp_file(invoice_text) + schematron_output = executable.transform_to_value(source_file=tmp_filename) + unlink(tmp_filename) + + if not schematron_output: + raise Exception("Could not generate evaluation due to bad XML!") + + violations = [] + + num_warnings = 0 + num_errors = 0 + rules_failed = set() + + output = schematron_output.item_at(0).get_node_value().children[0].children + + for item in output: + if item.name and item.name.endswith("failed-assert"): + id_name = item.get_attribute_value("id") + rules_failed.add(id_name) + is_fatal = item.get_attribute_value("flag") == "fatal" + + if is_fatal: + num_errors += 1 + else: + num_warnings += 1 + + xpath = item.get_attribute_value("location") + test = item.get_attribute_value("test") + + message = "" + suggestion = "" + if item.children: + message = item.children[0].string_value - violations.append(Violations( - rule_id=id_name, - is_fatal=is_fatal, - xpath=xpath, - test=test, - message=message, - suggestion=suggestion - )) - - evaluation = Evaluations.create( - is_valid=num_errors == 0, - num_warnings=num_warnings, - num_errors=num_errors, - num_rules_failed=len(rules_failed) - ) - - for violation in violations: - violation.evaluation = evaluation.id - violation.save() - - return evaluation + if len(item.children) > 1: + suggestion = item.children[1].string_value + + violations.append(Violations( + rule_id=id_name, + is_fatal=is_fatal, + xpath=xpath, + test=test, + message=message, + suggestion=suggestion + )) + + evaluation = Evaluations.create( + is_valid=num_errors == 0, + num_warnings=num_warnings, + num_errors=num_errors, + num_rules_failed=len(rules_failed) + ) + + for violation in violations: + violation.evaluation = evaluation.id + violation.save() + + return evaluation def generate_report(invoice_name: str, invoice_text: str) -> int: wellformedness_evaluation = None @@ -206,7 +195,4 @@ def generate_report(invoice_name: str, invoice_text: str) -> int: peppol=peppol_evaluation.id if peppol_evaluation else None ) - print(report) - print(report.id) - return report.id diff --git a/src/helpers.py b/src/helpers.py index 834dd8b..580a9e0 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -1,5 +1,6 @@ from tempfile import NamedTemporaryFile from src.type_structure import * +import requests def create_temp_file(invoice_text: str) -> str: tmp = NamedTemporaryFile(mode='w', delete=False) diff --git a/src/invoice.py b/src/invoice.py index 751f632..1e14530 100644 --- a/src/invoice.py +++ b/src/invoice.py @@ -1,6 +1,5 @@ from typing import Dict from src.type_structure import * -from src.report import report_get_v1 from src.database import Users, Reports, Violations, Evaluations, db import requests from src.generation import generate_report @@ -29,24 +28,22 @@ def invoice_upload_url_v1(invoice_name: str, invoice_url: str): def invoice_upload_file_v1(invoice_name: str, invoice_file): - with open(invoice_file, 'rb') as f: - invoice_text = f.read() - - report_id = generate_report(invoice_name, invoice_text) + report_id = generate_report(invoice_name, invoice_file.decode("utf-8")) return { "report_id": report_id } def invoice_check_validity_v1(report_id: int) -> CheckValidReturn: - report = Reports.query.filter_by(id=report_id).one() + try: + report = Reports.get_by_id(report_id) + except DoesNotExist: + raise Exception(f"Report with id {report_id} not found") - return CheckValidReturn(is_valid=report.is_valid, invoice_hash=report.invoice_hash) + return CheckValidReturn(is_valid=report.is_valid) def invoice_generate_hash_v1(invoice: Invoice) -> str: return "hash" -def invoice_bulk_quick_fix_v1(invoices: List[Invoice]) -> List[Invoice]: - invoice = Invoice(name="invoice", source="text", data="") - invoice_list = [invoice] - return invoice_list +def invoice_file_upload_bulk_v1(invoices: List[Invoice]) -> List[int]: + return [generate_report(invoice.name, invoice.data) for invoice in invoices] diff --git a/src/report.py b/src/report.py index 943acb5..ac54ad5 100644 --- a/src/report.py +++ b/src/report.py @@ -1,9 +1,6 @@ from src.type_structure import * -from lxml import etree from typing import Dict -from saxonche import PySaxonProcessor from tempfile import NamedTemporaryFile -import requests from os import unlink from src.database import Users, Reports, Violations, Evaluations, db from src.helpers import extract_text_from_invoice @@ -14,81 +11,66 @@ def report_wellformedness_v1(invoice: Invoice) -> Evaluation: invoice_text = extract_text_from_invoice(invoice) evaluation = generate_wellformedness_evaluation(invoice_text) - return evaluation.to_json() + return Evaluation(**evaluation.to_json()) def report_schema_v1(invoice: Invoice) -> Evaluation: invoice_text = extract_text_from_invoice(invoice) evaluation = generate_schema_evaluation(invoice_text) - return evaluation.to_json() - + return Evaluation(**evaluation.to_json()) def report_syntax_v1(invoice: Invoice) -> Evaluation: data = extract_text_from_invoice(invoice) evaluation = generate_xslt_evaluation("syntax", data) - return evaluation.to_json() - + return Evaluation(**evaluation.to_json()) def report_peppol_v1(invoice: Invoice) -> Evaluation: data = extract_text_from_invoice(invoice) evaluation = generate_xslt_evaluation("peppol", data) - return evaluation.to_json() + return Evaluation(**evaluation.to_json()) -def report_get_v1(report_id: int) -> Report: - report = Report( - report_id=0, - score=0, - date_generated="", - invoice_name="", - invoice_raw="", - invoice_hash="", - is_valid=True, - total_num_violations=0, - wellformedness_evaluation=None, - schema_evaluation=None, - syntax_evaluation=None, - peppol_evaluation=None - ) - return report - -def report_list_all_v1(order_by: OrderBy) -> List[Report]: - report = Report( - report_id=0, - score=0, - date_generated="", - invoice_name="", - invoice_raw="", - invoice_hash="", - is_valid=True, - total_num_violations=0, - wellformedness_evaluation=None, - schema_evaluation=None, - syntax_evaluation=None, - peppol_evaluation=None - ) - reports = [report] - return reports +def report_list_all_v1() -> List[int]: + return [report.id for report in Reports.select()] -def report_export_v1(report_id, report_format) -> ReportExport: - export = ReportExport(url="", invoice_hash="") - return export +def report_list_by_v1(order_by: OrderBy) -> List[int]: + if order_by.is_ascending: + order = getattr(Reports, order_by.table).asc() + else: + order = getattr(Reports, order_by.table).desc() + + return [report.id for report in Reports.select().order_by(order)] def report_change_name_v1(report_id: int, new_name: str) -> Dict[None, None]: + try: + report = Reports.get_by_id(report_id) + except DoesNotExist: + raise Exception(f"Report with id {report_id} not found") + + report.invoice_name = new_name + report.save() + return {} def report_delete_v1(report_id: int) -> Dict[None, None]: + try: + report = Reports.get_by_id(report_id) + except DoesNotExist: + raise Exception(f"Report with id {report_id} not found") + + report.delete_instance() + return {} def report_bulk_generate_v1(invoices: List[Invoice]) -> List[Report]: report = Report( report_id=0, - score=0, date_generated="", invoice_name="", - invoice_raw="", + invoice_text="", invoice_hash="", is_valid=True, - total_num_violations=0, + total_warnings=0, + total_errors=0, wellformedness_evaluation=None, schema_evaluation=None, syntax_evaluation=None, @@ -96,9 +78,3 @@ def report_bulk_generate_v1(invoices: List[Invoice]) -> List[Report]: ) reports = [report] return reports - -def report_bulk_export_v1(report_ids, report_format) -> List[ReportExport]: - export = ReportExport(url="", invoice_hash="") - exports = [export] - return exports - diff --git a/src/server.py b/src/server.py index cd33816..d32d968 100644 --- a/src/server.py +++ b/src/server.py @@ -6,7 +6,7 @@ from src.export import * from src.type_structure import * from src.database import clear_v1 -from fastapi import FastAPI, Request, HTTPException, UploadFile +from fastapi import FastAPI, Request, HTTPException, UploadFile, File from fastapi.responses import Response, JSONResponse, HTMLResponse, StreamingResponse from src.error import AuthenticationError, InputError from io import BytesIO @@ -27,6 +27,10 @@ async def validation_exception_handler(request: Request, exc: Exception): # ENDPOINTS BELOW +@app.get("/") +async def welcome(): + return "Welcome to the Churros Validation API!" + @app.get("/health_check/v1") async def health_check(): return health_check_v1() @@ -40,8 +44,9 @@ async def invoice_upload_url(invoice_name: str, invoice_url: str) -> Dict: return invoice_upload_url_v1(invoice_name=invoice_name, invoice_url=invoice_url) @app.post("/invoice/upload_file/v1") -async def invoice_upload_file(invoice_file: UploadFile) -> Dict: - return invoice_upload_file_v1(invoice_name=invoice_file.filename, invoice_file=invoice_file.file) +async def invoice_upload_file(file: UploadFile = File(...)) -> Dict: + file_data = await file.read() + return invoice_upload_file_v1(invoice_name=file.filename, invoice_file=file_data) # type: ignore @app.get("/export/json_report/v1") async def export_json_report(report_id: int): @@ -49,7 +54,7 @@ async def export_json_report(report_id: int): @app.get("/export/pdf_report/v1") async def export_pdf_report(report_id: int): - pdf_file = BytesIO(export_pdf_report_v1(report_id)) + pdf_file = BytesIO(export_pdf_report_v1(report_id)) # type: ignore # Return the PDF as a streaming response headers = { @@ -88,29 +93,23 @@ async def report_syntax(invoice: Invoice) -> Evaluation: async def report_peppol(invoice: Invoice) -> Evaluation: return report_peppol_v1(invoice) -@app.get("/report/get/v1") -async def report_get(report_id: int) -> Report: - return report_get_v1(report_id) - @app.get("/report/list_all/v1") -async def report_list_all(order_by: OrderBy) -> List[Report]: - return report_list_all_v1(order_by) +async def report_list_all() -> List[int]: + return report_list_all_v1() -@app.get("/report/list_score/v1") -async def report_list_score(score: int, order_by: OrderBy) -> List[Report]: - return report_list_score_v1(score, order_by) +@app.get("/report/list_by/v1") +async def report_list_by(order_by: OrderBy) -> List[int]: + return report_list_by_v1(order_by) -# TODO: return type @app.put("/report/change_name/v1") async def report_change_name(report_id: int, new_name: str) -> Dict[None, None]: return report_change_name_v1(report_id, new_name) -# TODO: return type @app.delete("/report/delete/v1") async def report_delete(report_id: int) -> Dict[None, None]: return report_delete_v1(report_id) -@app.get("/invoice/check_validity/v1") +@app.get("/report/check_validity/v1") async def invoice_check_validity(report_id: int) -> CheckValidReturn: return invoice_check_validity_v1(report_id) @@ -118,18 +117,12 @@ async def invoice_check_validity(report_id: int) -> CheckValidReturn: async def invoice_generate_hash(invoice: Invoice) -> str: return invoice_generate_hash_v1(invoice) -# TODO: check return type -@app.post("/report/bulk_generate/v1") -async def report_bulk_generate(invoices: List[Invoice]) -> List[Report]: - return report_bulk_generate_v1(invoices) - -@app.get("/invoice/bulk_quick_fix/v1") -async def invoice_bulk_quick_fix(invoices: List[Invoice]) -> List[Invoice]: - return invoice_bulk_quick_fix_v1(invoices) +@app.post("/invoice/file_upload_bulk/v1") +async def invoice_file_upload_bulk(invoices: List[Invoice]) -> List[int]: + return invoice_file_upload_bulk_v1(invoices) -# TODO: check input and return type -@app.get("/report/bulk_export/v1") -async def report_bulk_export(report_ids: List[int], report_format: Format) -> List[ReportExport]: +@app.post("/report/bulk_export/v1") +async def report_bulk_export(report_ids: List[int], report_format: str) -> List: return report_bulk_export_v1(report_ids, report_format) diff --git a/src/setup_database.py b/src/setup_database.py deleted file mode 100644 index 3b975ef..0000000 --- a/src/setup_database.py +++ /dev/null @@ -1,17 +0,0 @@ -import psycopg2 -import os - -conn = psycopg2.connect( - host=os.environ['POSTGRES_HOST'], - port=os.environ['POSTGRES_PORT'], - user=os.environ['POSTGRES_USER'], - password=os.environ['POSTGRES_PASSWORD'], - database=os.environ['POSTGRES_DB'] -) - -cur = conn.cursor() -cur.execute('SELECT * FROM Reports') -rows = cur.fetchall() -print(rows) - -conn.close() diff --git a/src/type_structure.py b/src/type_structure.py index 3c00121..75a7824 100644 --- a/src/type_structure.py +++ b/src/type_structure.py @@ -4,12 +4,9 @@ Server_call_return = Dict[str, Any] class OrderBy(BaseModel): - attribute: Literal["score", "date_generated", "invoice_name", "total_num_violations"] + table: Literal["date_generated", "invoice_name", "total_errors", "total_warnings"] is_ascending: bool -class Format(BaseModel): - format: Literal["HTML", "PDF", "CSV"] - class Invoice(BaseModel): name: str source: str @@ -61,4 +58,3 @@ class ReportExport(BaseModel): class CheckValidReturn(BaseModel): is_valid: bool - invoice_hash: str diff --git a/src/classes/__init__.py b/tests/bulk/__init__.py similarity index 100% rename from src/classes/__init__.py rename to tests/bulk/__init__.py diff --git a/tests/bulk/bulk_export_test.py b/tests/bulk/bulk_export_test.py new file mode 100644 index 0000000..aa64f23 --- /dev/null +++ b/tests/bulk/bulk_export_test.py @@ -0,0 +1,37 @@ +from src.type_structure import * +from tests.constants import VALID_INVOICE_TEXT +from tests.helpers import invalidate_invoice, replace_part_of_string +from tests.server_calls import invoice_file_upload_bulk_v1, report_bulk_export_v1 + +""" +===================================== +/report/bulk_export/v1 TESTS +===================================== +""" + +def test_bulk_export_valid(): + data = VALID_INVOICE_TEXT + invoice_valid = Invoice(name="My Invoice", source="text", data=data) + + data = invalidate_invoice(VALID_INVOICE_TEXT, "tag", "cac:BillingReference", "", "cac:BillingReferencee", 1) + data = invalidate_invoice(data, "tag", "cac:BillingReference", "", "cac:BillingReferencee", 1) + invoice_schema = Invoice(name="My Invoice", source="text", data=data) + + data = invalidate_invoice(VALID_INVOICE_TEXT, 'content', 'cbc:EndpointID', '', 'Not an ABN', 1) + invoice_peppol = Invoice(name="My Invoice", source="text", data=data) + + data = invalidate_invoice(VALID_INVOICE_TEXT, 'attrib', 'cbc:Amount', 'currencyID', 'TEST', 1) + invoice_syntax = Invoice(name="My Invoice", source="text", data=data) + + data = replace_part_of_string(VALID_INVOICE_TEXT, 2025, 2027, "id") + + invoice_wellformedness = Invoice(name="My Invoice", source="text", data=data) + + invoices = [invoice_valid, invoice_schema, invoice_peppol, invoice_syntax, invoice_wellformedness] + + report_ids = invoice_file_upload_bulk_v1(invoices) + + exports = report_bulk_export_v1(report_ids, "json") + + # Checking that the number of exports returned is the same as the number of invoices inputted + assert len(exports) == len(report_ids) == len(invoices) diff --git a/tests/bulk/bulk_upload_test.py b/tests/bulk/bulk_upload_test.py new file mode 100644 index 0000000..754903a --- /dev/null +++ b/tests/bulk/bulk_upload_test.py @@ -0,0 +1,36 @@ +from src.type_structure import * +from tests.constants import VALID_INVOICE_TEXT +from tests.helpers import invalidate_invoice, replace_part_of_string +from tests.server_calls import invoice_file_upload_bulk_v1 + +""" +===================================== +/invoice/file_upload_bulk/v1 TESTS +===================================== +""" + +def test_bulk_upload_valid(): + data = VALID_INVOICE_TEXT + invoice_valid = Invoice(name="My Invoice", source="text", data=data) + + data = invalidate_invoice(VALID_INVOICE_TEXT, "tag", "cac:BillingReference", "", "cac:BillingReferencee", 1) + data = invalidate_invoice(data, "tag", "cac:BillingReference", "", "cac:BillingReferencee", 1) + invoice_schema = Invoice(name="My Invoice", source="text", data=data) + + data = invalidate_invoice(VALID_INVOICE_TEXT, 'content', 'cbc:EndpointID', '', 'Not an ABN', 1) + invoice_peppol = Invoice(name="My Invoice", source="text", data=data) + + data = invalidate_invoice(VALID_INVOICE_TEXT, 'attrib', 'cbc:Amount', 'currencyID', 'TEST', 1) + invoice_syntax = Invoice(name="My Invoice", source="text", data=data) + + data = replace_part_of_string(VALID_INVOICE_TEXT, 2025, 2027, "id") + + invoice_wellformedness = Invoice(name="My Invoice", source="text", data=data) + + invoices = [invoice_valid, invoice_schema, invoice_peppol, invoice_syntax, invoice_wellformedness] + + report_ids = invoice_file_upload_bulk_v1(invoices) + + # checking that the number of report_ids returned is the same as the number of invoices inputted + assert len(report_ids) == len(invoices) + diff --git a/tests/report/__init__.py b/tests/report/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/report/change_name_test.py b/tests/report/change_name_test.py new file mode 100644 index 0000000..6b9bac8 --- /dev/null +++ b/tests/report/change_name_test.py @@ -0,0 +1,26 @@ +from src.type_structure import * +from tests.server_calls import report_change_name_v1, export_json_report_v1, invoice_upload_text_v1 +from tests.constants import VALID_INVOICE_TEXT +from tests.helpers import invalidate_invoice, remove_part_of_string + +""" +===================================== +/report/change_name/v1 TESTS +===================================== +""" + +# Testing that the report was generated properly and matches input data +def test_change_name(): + invoice = Invoice(name="My Invoice", source="text", data=VALID_INVOICE_TEXT) + report_id = invoice_upload_text_v1(invoice.name, invoice.data)["report_id"] + + report = Report(**export_json_report_v1(report_id)) + + # Checking for the old name of the invoice + assert report.invoice_name == "My Invoice" + + report_change_name_v1(report_id, "New Name") + report = Report(**export_json_report_v1(report_id)) + + # Checking for the new name of the invoice + assert report.invoice_name == "New Name" diff --git a/tests/report/check_validity_test.py b/tests/report/check_validity_test.py new file mode 100644 index 0000000..4e74950 --- /dev/null +++ b/tests/report/check_validity_test.py @@ -0,0 +1,20 @@ +from src.type_structure import * +from tests.server_calls import invoice_upload_text_v1, report_check_validity_v1 +from tests.constants import VALID_INVOICE_TEXT +from tests.helpers import invalidate_invoice, remove_part_of_string, clear_database + +""" +===================================== +/report/check_validity/v1 TESTS +===================================== +""" + +def test_check_validity_one_report(): + report_id_valid = invoice_upload_text_v1("invoice", VALID_INVOICE_TEXT)["report_id"] + + invalid_invoice = VALID_INVOICE_TEXT[:1000] + report_id_invalid = invoice_upload_text_v1("invoice", invalid_invoice)["report_id"] + + assert report_check_validity_v1(report_id_valid)["is_valid"] == True + assert report_check_validity_v1(report_id_invalid)["is_valid"] == False + diff --git a/tests/report/delete_test.py b/tests/report/delete_test.py new file mode 100644 index 0000000..a97afb7 --- /dev/null +++ b/tests/report/delete_test.py @@ -0,0 +1,17 @@ +from src.type_structure import * +from tests.server_calls import report_delete_v1, export_json_report_v1, invoice_upload_text_v1 +from tests.constants import VALID_INVOICE_TEXT +from tests.helpers import invalidate_invoice, remove_part_of_string + +""" +===================================== +/report/delete/v1 TESTS +===================================== +""" + +def test_delete(): + report_id = invoice_upload_text_v1("invoice", VALID_INVOICE_TEXT)["report_id"] + + report_delete_v1(report_id) + + assert export_json_report_v1(report_id)["code"] == 500 diff --git a/tests/report/list_all_test.py b/tests/report/list_all_test.py new file mode 100644 index 0000000..fddf928 --- /dev/null +++ b/tests/report/list_all_test.py @@ -0,0 +1,39 @@ +from src.type_structure import * +from tests.server_calls import report_list_all_v1, invoice_upload_text_v1, export_json_report_v1 +from tests.constants import VALID_INVOICE_TEXT +from tests.helpers import invalidate_invoice, remove_part_of_string, clear_database + +""" +===================================== +/report/list_all/v1 TESTS +===================================== +""" + +def test_list_all_one_report(): + invoice = Invoice(name="My Invoice", source="text", data=VALID_INVOICE_TEXT) + invoice_upload_text_v1(invoice.name, invoice.data) + + report_ids = report_list_all_v1() + report = export_json_report_v1(report_ids[0]) + report = Report(**report) + + # Checking for the name of the invoice + assert report.invoice_name == "My Invoice" + + + +def test_list_all_many_reports(): + invoice = Invoice(name="My Invoice", source="text", data=VALID_INVOICE_TEXT) + invoice_upload_text_v1(invoice.name, invoice.data) + invoice_upload_text_v1(invoice.name, invoice.data) + invoice_upload_text_v1(invoice.name, invoice.data) + + report_ids = report_list_all_v1() + + assert len(report_ids) == 3 + + report = export_json_report_v1(report_ids[0]) + report = Report(**report) + + # Checking for the name of the invoice + assert report.invoice_name == "My Invoice" \ No newline at end of file diff --git a/tests/report/list_by_test.py b/tests/report/list_by_test.py new file mode 100644 index 0000000..73ad066 --- /dev/null +++ b/tests/report/list_by_test.py @@ -0,0 +1,27 @@ +from src.type_structure import * +from tests.server_calls import report_list_by_v1, invoice_upload_text_v1, export_json_report_v1 +from tests.constants import VALID_INVOICE_TEXT +from tests.helpers import invalidate_invoice, remove_part_of_string, clear_database + +""" +===================================== +/report/list_by/v1 TESTS +===================================== +""" + +def test_list_by_many_reports(): + invoice_upload_text_v1("invoice2", VALID_INVOICE_TEXT) + invoice_upload_text_v1("invoice1", VALID_INVOICE_TEXT) + invoice_upload_text_v1("invoice3", VALID_INVOICE_TEXT) + + report_ids1 = report_list_by_v1(OrderBy(table="invoice_name", is_ascending=True)) + report_ids2 = report_list_by_v1(OrderBy(table="invoice_name", is_ascending=False)) + + assert len(report_ids1) == len(report_ids2) == 3 + assert report_ids1[0] == report_ids2[-1] + + report = export_json_report_v1(report_ids1[0]) + report = Report(**report) + + # Checking for the name of the first invoice + assert report.invoice_name == "invoice1" diff --git a/tests/server_calls.py b/tests/server_calls.py index 4ccf512..dd5d34f 100644 --- a/tests/server_calls.py +++ b/tests/server_calls.py @@ -35,6 +35,11 @@ def invoice_upload_file_v1(invoice_name: str, invoice_filename) -> Server_call_r return json.loads(response.text) +def invoice_file_upload_bulk_v1(invoices: List[Invoice]) -> Server_call_return: + payload = [invoice.dict() for invoice in invoices] + response = requests.post(full_url + 'invoice/file_upload_bulk/v1', json=payload) + + return json.loads(response.text) # Export Endpoints @@ -99,6 +104,51 @@ def report_peppol_v1(invoice: Invoice) -> Server_call_return: return json.loads(response.text) +def report_list_all_v1() -> Server_call_return: + response = requests.get(full_url + 'report/list_all/v1') + + return json.loads(response.text) + +def report_list_by_v1(order_by: OrderBy) -> Server_call_return: + payload = order_by.dict() + response = requests.get(full_url + 'report/list_by/v1', json=payload) + + return json.loads(response.text) + +def report_delete_v1(report_id: int) -> Server_call_return: + payload = { + "report_id": report_id + } + response = requests.delete(full_url + 'report/delete/v1', params=payload) + + return json.loads(response.text) + +def report_change_name_v1(report_id: int, new_name: str) -> Server_call_return: + payload = { + "report_id": report_id, + "new_name": new_name + } + response = requests.put(full_url + 'report/change_name/v1', params=payload) + + return json.loads(response.text) + +def report_check_validity_v1(report_id: int) -> Server_call_return: + payload = { + "report_id": report_id + } + response = requests.get(full_url + 'report/check_validity/v1', params=payload) + + return json.loads(response.text) + + +def report_bulk_export_v1(report_ids: List[int], report_format: str) -> Server_call_return: + params = { + "report_format": report_format + } + response = requests.post(full_url + 'report/bulk_export/v1', json=report_ids, params=params) + + return response.json() + # Other Endpoints From d0a3d1c7a92db20905345f06352b9a5050f78f2f Mon Sep 17 00:00:00 2001 From: denzel-i <126758723+denzel-i@users.noreply.github.com> Date: Sun, 12 Mar 2023 18:21:50 +0700 Subject: [PATCH 2/2] Writing tests for export/json_report endpoint (#18) * Initial testing for report/json_report endpoint * Finished testing for report/json_report endpoint * Resolving merge conflict * Commenting out unimplemented tests * Changing the requirements for psycopg2-binary into 2.9.5 * Adding DoesNotExist import * Finishing tests for export/json_report endpoint * Deleted duplicate file - tests/report/json_report_test.py --------- Co-authored-by: dzl-i --- requirements.txt | 2 +- src/report.py | 1 + tests/export/json_report_test.py | 229 +++++++++++++++++++++++++++++-- 3 files changed, 222 insertions(+), 10 deletions(-) diff --git a/requirements.txt b/requirements.txt index 372dda7..ee521b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,4 +16,4 @@ weasyprint==58.1 beautifulsoup4==4.11.2 peewee==3.16.0 python-multipart==0.0.6 -psycopg2-binary==2.8.6 +psycopg2-binary==2.9.5 diff --git a/src/report.py b/src/report.py index ac54ad5..3134791 100644 --- a/src/report.py +++ b/src/report.py @@ -5,6 +5,7 @@ from src.database import Users, Reports, Violations, Evaluations, db from src.helpers import extract_text_from_invoice from src.generation import generate_xslt_evaluation, generate_schema_evaluation, generate_wellformedness_evaluation +from peewee import DoesNotExist def report_wellformedness_v1(invoice: Invoice) -> Evaluation: diff --git a/tests/export/json_report_test.py b/tests/export/json_report_test.py index 8f37795..90f08fd 100644 --- a/tests/export/json_report_test.py +++ b/tests/export/json_report_test.py @@ -5,13 +5,15 @@ """ ===================================== -/report/json_report/v1 TESTS +/export/json_report/v1 TESTS ===================================== """ # Testing that the report was generated properly and matches input data def test_json_valid_invoice(): - invoice = Invoice(name="My Invoice", source="text", data=VALID_INVOICE_TEXT) + data = VALID_INVOICE_TEXT + + invoice = Invoice(name="My Invoice", source="text", data=data) report_id = invoice_upload_text_v1(invoice.name, invoice.data)["report_id"] report = export_json_report_v1(report_id) @@ -26,9 +28,6 @@ def test_json_valid_invoice(): # Checking for the name of the invoice assert report.invoice_name == "My Invoice" - # Checking for invoice text - assert report.invoice_text == invoice.data - # Invoice hash must be a string assert isinstance(report.invoice_hash, str) @@ -36,30 +35,242 @@ def test_json_valid_invoice(): assert report.is_valid # A valid invoice should have 0 violations + assert report.total_warnings == 0 assert report.total_errors == 0 # Check for wellformedness assert report.wellformedness_evaluation.is_valid == True assert report.wellformedness_evaluation.num_rules_failed == 0 assert report.wellformedness_evaluation.num_errors == 0 - assert report.wellformedness_evaluation.violations == [] + assert report.wellformedness_evaluation.num_warnings == 0 + assert len(report.wellformedness_evaluation.violations) == 0 # Check for schema assert report.schema_evaluation.is_valid == True assert report.schema_evaluation.num_rules_failed == 0 assert report.schema_evaluation.num_errors == 0 - assert report.schema_evaluation.violations == [] + assert report.schema_evaluation.num_warnings == 0 + assert len(report.schema_evaluation.violations) == 0 # Check for syntax assert report.syntax_evaluation.is_valid == True assert report.syntax_evaluation.num_rules_failed == 0 assert report.syntax_evaluation.num_errors == 0 - assert report.syntax_evaluation.violations == [] + assert report.syntax_evaluation.num_warnings == 0 + assert len(report.syntax_evaluation.violations) == 0 # Check for PEPPOL assert report.peppol_evaluation.is_valid == True assert report.peppol_evaluation.num_rules_failed == 0 assert report.peppol_evaluation.num_errors == 0 - assert report.peppol_evaluation.violations == [] + assert report.syntax_evaluation.num_warnings == 0 + assert len(report.peppol_evaluation.violations) == 0 + +def test_json_unique_id(): + data = VALID_INVOICE_TEXT + + # Creating 2 invoices + invoice1 = Invoice(name="Invoice01", source="text", data=data) + invoice2 = Invoice(name="Invoice02", source="text", data=data) + + # Creating 2 reports + report_id1 = invoice_upload_text_v1(invoice1.name, invoice1.data)["report_id"] + report1 = export_json_report_v1(report_id1) + report1 = Report(**report1) + report_id2 = invoice_upload_text_v1(invoice2.name, invoice2.data)["report_id"] + report2 = export_json_report_v1(report_id2) + report2 = Report(**report2) + + # Check that the report_id is not the same + report1.report_id != report2.report_id + + # Check names of the invoice + report1.invoice_name == "Invoice01" + report2.invoice_name == "Invoice02" + + # Check that the hash is not the same + report1.invoice_hash != report2.invoice_hash + +# Testing that a single rule fails when there is one error in the invoice +def test_json_single_violation(): + data = VALID_INVOICE_TEXT + + # Invalidating the currency code + data = invalidate_invoice(data, "attrib", "cbc:Amount", "currencyID", "TEST", 1) + + invoice = Invoice(name="Invoice Test", source="text", data=data) + + report_id = invoice_upload_text_v1(invoice.name, invoice.data)["report_id"] + report = export_json_report_v1(report_id) + report = Report(**report) + + # Check the name of the invoice + assert report.invoice_name == "Invoice Test" + + # Invalidating a currency code is fatal so report must not be valid + assert report.is_valid == False + + # 1 error from syntax and 2 errors from peppol + assert report.total_warnings == 0 + assert report.total_errors == 3 + + # Check for wellformedness + assert report.wellformedness_evaluation.is_valid == True + assert report.wellformedness_evaluation.num_rules_failed == 0 + assert report.wellformedness_evaluation.num_errors == 0 + assert report.wellformedness_evaluation.num_warnings == 0 + assert len(report.wellformedness_evaluation.violations) == 0 + + # Check for schema + assert report.schema_evaluation.is_valid == True + assert report.schema_evaluation.num_rules_failed == 0 + assert report.schema_evaluation.num_errors == 0 + assert report.schema_evaluation.num_warnings == 0 + assert len(report.schema_evaluation.violations) == 0 + + # Check for syntax + assert report.syntax_evaluation.is_valid == False + assert report.syntax_evaluation.num_rules_failed == 1 + assert report.syntax_evaluation.num_errors == 1 + assert report.syntax_evaluation.num_warnings == 0 + assert len(report.syntax_evaluation.violations) == 1 + + # Check for PEPPOL + assert report.peppol_evaluation.is_valid == False + assert report.peppol_evaluation.num_rules_failed == 2 + assert report.peppol_evaluation.num_errors == 2 + assert report.syntax_evaluation.num_warnings == 0 + assert len(report.peppol_evaluation.violations) == 2 + +# Testing that multiple violations are generated when there are multiple errors in the invoice +def test_json_multiple_violations_same_rule(): + data = VALID_INVOICE_TEXT + + # Invalidating the 2 ABNs + data = invalidate_invoice(data, "content", "cbc:EndpointID", "", "Not an ABN 1", 1) + data = invalidate_invoice(data, "content", "cbc:EndpointID", "", "Not an ABN 2", 2) + + invoice = Invoice(name="Invoice Test", source="text", data=data) + + report_id = invoice_upload_text_v1(invoice.name, invoice.data)["report_id"] + report = export_json_report_v1(report_id) + report = Report(**report) + + # Invalidating an ABN is only a warning so report must still be valid + assert report.is_valid + + # Invalidating 2 ABNs - 2 violation + assert report.total_warnings == 2 + assert report.total_errors == 0 + + # Check for wellformedness + assert report.wellformedness_evaluation.is_valid == True + assert report.wellformedness_evaluation.num_rules_failed == 0 + assert report.wellformedness_evaluation.num_errors == 0 + assert report.wellformedness_evaluation.num_warnings == 0 + assert len(report.wellformedness_evaluation.violations) == 0 + + # Check for schema + assert report.schema_evaluation.is_valid == True + assert report.schema_evaluation.num_rules_failed == 0 + assert report.schema_evaluation.num_errors == 0 + assert report.schema_evaluation.num_warnings == 0 + assert len(report.schema_evaluation.violations) == 0 + + # Check for syntax + assert report.syntax_evaluation.is_valid == True + assert report.syntax_evaluation.num_rules_failed == 0 + assert report.syntax_evaluation.num_errors == 0 + assert report.syntax_evaluation.num_warnings == 0 + assert len(report.syntax_evaluation.violations) == 0 + + # Check for PEPPOL + assert report.peppol_evaluation.is_valid == True + assert report.peppol_evaluation.num_rules_failed == 1 + assert report.peppol_evaluation.num_errors == 0 + assert report.peppol_evaluation.num_warnings == 2 + assert len(report.peppol_evaluation.violations) == 2 +def test_json_multiple_violations_different_rules(): + data = VALID_INVOICE_TEXT + + # Invalidating the 2 ABNs (PEPPOL) + data = invalidate_invoice(data, "content", "cbc:EndpointID", "", "Not an ABN 1", 1) + data = invalidate_invoice(data, "content", "cbc:EndpointID", "", "Not an ABN 2", 2) + + # Invalidating the 2 Country/IdentificationCode (Syntax) + data = invalidate_invoice(data, 'content', 'cbc:IdentificationCode', '', 'TEST', 1) + data = invalidate_invoice(data, 'content', 'cbc:IdentificationCode', '', 'TEST', 2) + + invoice = Invoice(name="Invoice Test", source="text", data=data) + + report_id = invoice_upload_text_v1(invoice.name, invoice.data)["report_id"] + report = export_json_report_v1(report_id) + report = Report(**report) + + # Invalidating an Country/IdentificationCode is fatal so report must be invalid + assert report.is_valid == False + + # 2 Warnings and 2 Errors + assert report.total_warnings == 2 + assert report.total_errors == 2 + + # Check for wellformedness + assert report.wellformedness_evaluation.is_valid == True + assert report.wellformedness_evaluation.num_rules_failed == 0 + assert report.wellformedness_evaluation.num_errors == 0 + assert report.wellformedness_evaluation.num_warnings == 0 + assert len(report.wellformedness_evaluation.violations) == 0 + + # Check for schema + assert report.schema_evaluation.is_valid == True + assert report.schema_evaluation.num_rules_failed == 0 + assert report.schema_evaluation.num_errors == 0 + assert report.schema_evaluation.num_warnings == 0 + assert len(report.schema_evaluation.violations) == 0 + + # Check for syntax + assert report.syntax_evaluation.is_valid == False + assert report.syntax_evaluation.num_rules_failed == 1 + assert report.syntax_evaluation.num_errors == 2 + assert report.syntax_evaluation.num_warnings == 0 + assert len(report.syntax_evaluation.violations) == 2 + + # Check for PEPPOL + assert report.peppol_evaluation.is_valid == True + assert report.peppol_evaluation.num_rules_failed == 1 + assert report.peppol_evaluation.num_errors == 0 + assert report.peppol_evaluation.num_warnings == 2 + assert len(report.peppol_evaluation.violations) == 2 + +# Testing invalid wellformedness +def test_json_invalid_wellformedness(): + data = VALID_INVOICE_TEXT + + # Removing a closing tag + data = remove_part_of_string(data, 11530, 11540) + + invoice = Invoice(name="Invoice Test", source="text", data=data) + + report_id = invoice_upload_text_v1(invoice.name, invoice.data)["report_id"] + report = export_json_report_v1(report_id) + report = Report(**report) + + # Removing a closing tag is a fatal error + assert report.is_valid == False + + # Violating 1 rule + assert report.total_warnings == 0 + assert report.total_errors == 1 + + # Check for wellformedness + assert report.wellformedness_evaluation.is_valid == False + assert report.wellformedness_evaluation.num_rules_failed == 1 + assert report.wellformedness_evaluation.num_errors == 1 + assert report.wellformedness_evaluation.num_warnings == 0 + assert len(report.wellformedness_evaluation.violations) == 1 + # Invalid wellformedness means no schema, syntax and peppol evaluation + assert report.schema_evaluation == None + assert report.syntax_evaluation == None + assert report.peppol_evaluation == None