diff --git a/custom_json_diff/lib/custom_diff_classes.py b/custom_json_diff/lib/custom_diff_classes.py index 50cf539..6dabf50 100644 --- a/custom_json_diff/lib/custom_diff_classes.py +++ b/custom_json_diff/lib/custom_diff_classes.py @@ -19,6 +19,14 @@ logger = logging.getLogger(__name__) +class Array(list): + def __init__(self, *args): + super().__init__(*args) + + def __eq__(self, other): + return all((all(i in other for i in self), all(i in self for i in other))) + + @dataclass class Options: # type: ignore allow_new_data: bool = False @@ -140,15 +148,15 @@ def __init__(self, comp: Dict, options: Options): self.component_type = comp.get("type", "") self.description = comp.get("description", "") self.evidence = comp.get("evidence", {}) - self.external_references = comp.get("externalReferences", []) + self._external_references = Array(comp.get("externalReferences", [])) self.group = comp.get("group", "") - self.hashes = comp.get("hashes", []) - self.licenses = comp.get("licenses", []) + self._hashes = Array(comp.get("hashes", [])) + self._licenses = Array(comp.get("licenses", [])) self.name = comp.get("name", "") self.options = options # deprecated self.original_data = comp - self.properties = comp.get("properties", []) + self._properties = Array(comp.get("properties", [])) self.publisher = comp.get("publisher", "") self.purl = comp.get("purl", "") self.scope = comp.get("scope", []) @@ -172,27 +180,58 @@ def __ne__(self, other): return not self == other def _check_list_eq(self, other): - # Since these elements have been sorted, we can compare them directly return (self.properties == other.properties and self.evidence == other.evidence and self.hashes == other.hashes and self.licenses == other.licenses) + @property + def external_references(self): + return self._external_references + + @external_references.setter + def external_references(self, value): + self._external_references = Array(value) + + @property + def licenses(self): + return self._licenses + + @licenses.setter + def licenses(self, value): + self._licenses = Array(value) + + @property + def hashes(self): + return self._hashes + + @hashes.setter + def hashes(self, value): + self._hashes = Array(value) + + @property + def properties(self): + return self._properties + + @properties.setter + def properties(self, value): + self._properties = Array(value) + def to_dict(self): return filter_empty(self.options.include_empty, { "author": self.author, "bom-ref": self.bom_ref, "type": self.component_type, "description": self.description, "evidence": self.evidence, - "externalReferences": self.external_references, "group": self.group, - "hashes": self.hashes, "licenses": self.licenses, "name": self.name, - "properties": self.properties, "publisher": self.publisher, "purl": self.purl, + "externalReferences": self._external_references, "group": self.group, + "hashes": self._hashes, "licenses": self._licenses, "name": self.name, + "properties": self._properties, "publisher": self.publisher, "purl": self.purl, "scope": self.scope, "version": self.version}) class BomDependency: def __init__(self, dep: Dict, options: "Options"): self.ref = dep.get("ref", "") - self.deps = dep.get("dependsOn", []) + self._deps = Array(dep.get("dependsOn", [])) # deprecated self.original_data = {} - self.ref_no_version, self.deps_no_version = import_bom_dependency( + self.ref_no_version, self._deps_no_version = import_bom_dependency( dep, options.allow_new_versions) if options.allow_new_versions else "", [] self.options = options @@ -209,12 +248,28 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + @property + def deps(self): + return self._deps + + @deps.setter + def deps(self, value): + self._deps = Array(value) + + @property + def deps_no_version(self): + return self._deps_no_version + + @deps_no_version.setter + def deps_no_version(self, value): + self._deps_no_version = Array(value) + def clear(self): options = self.options self.__init__(dep={}, options=options) def to_dict(self): - return {"ref": self.ref, "dependsOn": self.deps} + return {"ref": self.ref, "dependsOn": self._deps} class BomDicts: @@ -223,7 +278,7 @@ def __init__(self, options: "Options", filename: str, original_data: Dict, other dependencies: List | None = None, vulnerabilities: List | None = None): self.options = options self.options.doc_num = 1 if filename == options.file_1 else 2 - self.misc_data, self.components, self.services, self.dependencies, self.vdrs = import_bom_dict( + self.misc_data, self._components, self._services, self._dependencies, self._vdrs = import_bom_dict( self.options, original_data, other_data, components, services, dependencies, vulnerabilities) self.filename = filename @@ -268,6 +323,38 @@ def __sub__(self, other): new_bom_dict.options.doc_num = 1 return new_bom_dict + @property + def components(self): + return self._components + + @components.setter + def components(self, value): + _, self._components, _, _, _ = import_bom_dict(self.options, {}, components=value) + + @property + def services(self): + return self._services + + @services.setter + def services(self, value): + _, _, self._services, _, _ = import_bom_dict(self.options, {}, services=value) + + @property + def dependencies(self): + return self._dependencies + + @dependencies.setter + def dependencies(self, value): + _, _, _, self._dependencies, _ = import_bom_dict(self.options, {}, dependencies=value) + + @property + def vdrs(self): + return self._vdrs + + @vdrs.setter + def vdrs(self, value): + _, _, _, _, self._vdrs = import_bom_dict(self.options, {}, vulnerabilities=value) + def intersection(self, other, title: str = "") -> "BomDicts": components = [] dependencies = [] @@ -348,7 +435,7 @@ def __init__(self, svc: Dict, options: "Options"): self.search_key = create_comp_key(svc, options.svc_keys) self.original_data = svc self.name = svc.get("name", "") - self.endpoints = svc.get("endpoints", []) + self._endpoints = Array(svc.get("endpoints", [])) self.authenticated = svc.get("authenticated", "") self.x_trust_boundary = svc.get("x-trust-boundary", "") self.options = options @@ -359,56 +446,46 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + @property + def endpoints(self): + return self._endpoints + + @endpoints.setter + def endpoints(self, value): + self._endpoints = Array(value) + def to_dict(self): return filter_empty(self.options.include_empty, { "name": self.name, - "endpoints": self.endpoints, + "endpoints": self._endpoints, "authenticated": self.authenticated, "x-trust-boundary": self.x_trust_boundary }) -@dataclass class BomVdr: """Class for holding bom vulnerability data""" - id: str = "" - bom_ref: str = "" - advisories: list = field(default_factory=list) - affects: List = field(default_factory=list) - analysis: Dict = field(default_factory=dict) - cwes: List = field(default_factory=list) - data: Dict = field(default_factory=dict) - description: str = "" - detail: str = "" - properties: List = field(default_factory=list) - published: str = "" - ratings: list = field(default_factory=list) - recommendation: str = "" - references: list = field(default_factory=list) - source: Dict = field(default_factory=dict) - updated: str = "" - options: "Options" = field(default_factory=lambda: Options()) # type: ignore - - def __post_init__(self): - if not self.options: - self.options = Options() - self.id = self.id or (self.data.get("id") or "") - self.bom_ref = self.bom_ref or (self.data.get("bom-ref") or "") - self.advisories = self.advisories or (self.data.get("advisories") or []) - self.affects = self.affects or (self.data.get("affects") or []) - if self.affects and not isinstance(self.affects[0], BomVdrAffects): - self.affects = [BomVdrAffects(i, self.options) for i in self.affects] - self.analysis = self.analysis or (self.data.get("analysis") or {}) - self.cwes = self.cwes or (self.data.get("cwes") or []) - self.description = self.description or (self.data.get("description") or "") - self.detail = self.detail or (self.data.get("detail") or "") - self.properties = self.properties or (self.data.get("properties") or []) - self.published = self.published or (self.data.get("published") or "") - self.ratings = self.ratings or (self.data.get("ratings") or []) - self.recommendation = self.recommendation or (self.data.get("recommendation") or "") - self.references = self.references or (self.data.get("references") or []) - self.source = self.source or (self.data.get("source") or {}) - self.updated = self.updated or (self.data.get("updated") or "") + def __init__(self, data: Dict = None, options: "Options" = Options(), **kwargs): + if not data: + data = {} + self.options = options + self.id = kwargs.get("id") or (data.get("id") or "") + self.bom_ref = kwargs.get("bom_ref") or (data.get("bom-ref") or "") + self._advisories = Array(kwargs.get("advisories") or (data.get("advisories") or [])) + self._affects = Array(BomVdrAffects(i, options) for i in (kwargs.get("affects") or (data.get("affects") or []))) + if self._affects and not isinstance(self.affects[0], BomVdrAffects): + self._affects = Array([BomVdrAffects(i, self.options) for i in self._affects]) + self.analysis = kwargs.get("analysis") or (data.get("analysis") or {}) + self._cwes = Array(kwargs.get("cwes") or (data.get("cwes") or [])) + self.description = kwargs.get("description") or (data.get("description") or "") + self.detail = kwargs.get("detail") or (data.get("detail") or "") + self._properties = Array(kwargs.get("properties") or (data.get("properties") or [])) + self.published = kwargs.get("published") or (data.get("published") or "") + self._ratings = Array(kwargs.get("ratings") or (data.get("ratings") or [])) + self.recommendation = kwargs.get("recommendation") or (data.get("recommendation") or "") + self._references = Array(kwargs.get("references") or (data.get("references") or [])) + self.source = kwargs.get("source") or (data.get("source") or {}) + self.updated = kwargs.get("updated") or (data.get("updated") or "") def __eq__(self, other): if self.affects and not isinstance(self.affects[0], BomVdrAffects): @@ -430,19 +507,69 @@ def _field_eq(self, other): excludes bom-ref, affects, updated""" return all(( self.id == other.id, - self.advisories == other.advisories, + self._advisories == other._advisories, self.analysis == other.analysis, self.cwes == other.cwes, self.description == other.description, self.detail == other.detail, - self.properties == other.properties, + self._properties == other._properties, self.published == other.published, - self.ratings == other.ratings, + self._ratings == other._ratings, self.recommendation == other.recommendation, - self.references == other.references, + self._references == other._references, self.source == other.source, )) + @property + def advisories(self): + return self._advisories + + @advisories.setter + def advisories(self, value): + self._advisories = Array(value) + + @property + def affects(self): + return self._affects + + @affects.setter + def affects(self, value): + if value and not isinstance(value[0], BomVdrAffects): + value = [BomVdrAffects(i, self.options) for i in value] + self._affects = Array(value) + + @property + def cwes(self): + return self._cwes + + @cwes.setter + def cwes(self, value): + self._cwes = Array(value) + + @property + def properties(self): + return self._properties + + @properties.setter + def properties(self, value): + self._properties = Array(value) + + @property + def references(self): + return self._references + + @references.setter + def references(self, value): + self._references = Array(value) + + @property + def ratings(self): + return self._ratings + + @ratings.setter + def ratings(self, value): + self._ratings = Array(value) + def clear(self): options = self.options self.__init__() @@ -452,17 +579,17 @@ def to_dict(self): return filter_empty(self.options.include_empty, { "id": self.id, "bom-ref": self.bom_ref, - "advisories": self.advisories, - "affects": [i.to_dict() for i in self.affects], + "advisories": self._advisories, + "affects": [i.to_dict() for i in self._affects], "analysis": self.analysis, "cwes": self.cwes, "description": self.description, "detail": self.detail, - "properties": self.properties, + "properties": self._properties, "published": self.published, - "ratings": self.ratings, + "ratings": self._ratings, "recommendation": self.recommendation, - "references": self.references, + "references": self._references, "source": self.source, "updated": self.updated, }) @@ -473,7 +600,7 @@ def __init__(self, data: Dict, options: "Options"): self.data = data self.options = options self.ref = data.get("ref", "") - self.versions = data.get("", []) + self._versions = Array(data.get("versions", [])) def __eq__(self, other): if self.data == other.data: @@ -491,15 +618,23 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + @property + def versions(self): + return self._versions + + @versions.setter + def versions(self, value): + self._versions = Array(value) + def to_dict(self): - return filter_empty(self.options.include_empty, {"ref": self.ref, "versions": self.versions}) + return filter_empty(self.options.include_empty, {"ref": self.ref, "versions": self._versions}) class CsafDicts: def __init__(self, options: "Options", filename: str, original_data: Dict | None = None, document: FlatDicts | None = None, product_tree: FlatDicts | None = None, vulnerabilities: List | None = None): - self.document, self.product_tree, self.vulnerabilities = import_csaf( + self.document, self.product_tree, self._vulnerabilities = import_csaf( options, original_data, document, product_tree, vulnerabilities) self.options = options self.options.doc_num = 1 if filename == options.file_1 else 2 @@ -518,7 +653,7 @@ def __ne__(self, other): def __sub__(self, other): document = self.document - other.document product_tree = self.product_tree - other.product_tree - vulnerabilities = [i for i in self.vulnerabilities if i not in other.vulnerabilities] + vulnerabilities = Array([i for i in self.vulnerabilities if i not in other.vulnerabilities]) filename = self.filename options = deepcopy(self.options) return CsafDicts( @@ -530,6 +665,14 @@ def __sub__(self, other): vulnerabilities=vulnerabilities ) + @property + def vulnerabilities(self): + return self._vulnerabilities + + @vulnerabilities.setter + def vulnerabilities(self, value): + self._vulnerabilities = Array(value) + def get_refs(self): return {"vulnerabilities": {i.title for i in self.vulnerabilities}} @@ -561,14 +704,12 @@ def to_summary(self) -> Dict: class CsafScore: def __init__(self, data: Dict, options: "Options"): self.cvss_v3 = data.get("cvss_v3", {}) - self.products = data.get("products", []) + self._products = Array(data.get("products", [])) self.options = options def __eq__(self, other): if not self.options.allow_new_data: - self.cvss_v3 == other.cvss_v3 and all( - i in other.products for i in self.products) and all( - i in self.products for i in other.products) + return self.cvss_v3 == other.cvss_v3 and self._products == other._products a, b = order_documents(self, other) if a.cvss_v3 and a.cvss_v3 != b.cvss_v3: return False @@ -577,42 +718,61 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + @property + def products(self): + return self._products + + @products.setter + def products(self, value): + self._products = Array(value) + def to_dict(self): return filter_empty(self.options.include_empty, { "cvss_v3": self.cvss_v3, - "products": self.products + "products": self._products }) class CsafVulnerability: - def __init__(self, data: Dict, options: "Options"): - self.acknowledgements = data.get("acknowledgements", []) - self.cve = data.get("cve", "") - self.cwe = data.get("cwe", "") - self.discovery_date = data.get("discovery_date", "") - self.ids = data.get("ids", []) - self.notes = data.get("notes", []) + def __init__(self, data: Dict, options: "Options", **kwargs): + self._acknowledgements = Array(data.get("acknowledgements") or kwargs.get("acknowledgements", [])) + self.cve = data.get("cve") or kwargs.get("cve", "") + self.cwe = data.get("cwe") or kwargs.get("cwe", "") + self.discovery_date = data.get("discovery_date") or kwargs.get("discovery_date", "") + self._ids = Array(data.get("ids") or kwargs.get("ids", [])) + self._notes = Array(data.get("notes") or kwargs.get("notes", [])) self.options = options - self.product_status = data.get("product_status", {}) - self.references = data.get("references", []) - self.scores = [CsafScore(i, options) for i in data.get("scores", [])] - self.title = data.get("title", "") + self.product_status = data.get("product_status") or kwargs.get("product_status", {}) + self._references = Array(data.get("references") or kwargs.get("references", [])) + self._scores = Array([CsafScore(i, options) for i in (data.get("scores") or kwargs.get("scores", []))]) + self.title = data.get("title") or kwargs.get("title", "") def __eq__(self, other): if not self.options.allow_new_data: - return self.to_dict() == other.to_dict() - attributes_to_compare = [('cve', lambda self, other: self.cve == other.cve), - ('cwe', lambda self, other: self.cwe == other.cwe), - ('discovery_date', lambda self, other: self.discovery_date == other.discovery_date), - ('product_status', lambda self, other: self.product_status == other.product_status), ( + return all(( + self.cve == other.cve, + self.cwe == other.cwe, + self.discovery_date == other.discovery_date, + self.product_status == other.product_status, + self.acknowledgements == other._acknowledgements, + self.ids == other.ids, + self.notes == other.notes, + self.references == other.references, + self.scores == other.scores, + self.title == other.title + )) + attributes_to_compare = [('cve', lambda a, b: self.cve == other.cve), + ('cwe', lambda a, b: self.cwe == other.cwe), + ('discovery_date', lambda a, b: self.discovery_date == other.discovery_date), + ('product_status', lambda a, b: self.product_status == other.product_status), ( 'acknowledgements', - lambda self, other: advanced_eq_lists(self.acknowledgements, other.acknowledgements)), - ('ids', lambda self, other: advanced_eq_lists(self.ids, other.ids)), - ('notes', lambda self, other: advanced_eq_lists(self.notes, other.notes)), ( + lambda a, b: advanced_eq_lists(self.acknowledgements, other.acknowledgements)), + ('ids', lambda a, b: advanced_eq_lists(self.ids, other.ids)), + ('notes', lambda a, b: advanced_eq_lists(self.notes, other.notes)), ( 'references', - lambda self, other: advanced_eq_lists(self.references, other.references)), - ('scores', lambda self, other: advanced_eq_lists(self.scores, other.scores)), - ('title', lambda self, other: self.title == other.title),] + lambda a, b: advanced_eq_lists(self.references, other.references)), + ('scores', lambda a, b: advanced_eq_lists(self.scores, other.scores)), + ('title', lambda a, b: self.title == other.title),] return not any( getattr(self, attr) and not compare(self, other) for attr, compare in attributes_to_compare @@ -621,20 +781,62 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + @property + def acknowledgements(self): + return self._acknowledgements + + @acknowledgements.setter + def acknowledgements(self, value): + self._acknowledgements = Array(value) + + @property + def ids(self): + return self._ids + + @ids.setter + def ids(self, value): + self._ids = Array(value) + + @property + def notes(self): + return self._notes + + @notes.setter + def notes(self, value): + self._notes = Array(value) + + @property + def references(self): + return self._references + + @references.setter + def references(self, value): + self._references = Array(value) + + @property + def scores(self): + return self._scores + + @scores.setter + def scores(self, value): + if value and not isinstance(value[0], CsafScore): + value = [CsafScore(i, self.options) for i in value] + self._scores = Array(value) + def clear(self): options = self.options self.__init__(data={}, options=options) def to_dict(self): return filter_empty(self.options.include_empty, { - "acknowledgements": self.acknowledgements, + "acknowledgements": self._acknowledgements, "cve": self.cve, "cwe": self.cwe, "discovery_date": self.discovery_date, - "ids": self.ids, - "notes": self.notes, + "ids": self._ids, + "notes": self._notes, "product_status": self.product_status, - "references": self.references, + "references": self._references, "scores": [i.to_dict() for i in self.scores], "title": self.title }) @@ -786,7 +988,7 @@ def import_bom_dependency(data: Dict, allow_new_versions: bool) -> Tuple[str, Li d, _ = split_bom_ref(dep) new_deps.append(d) deps = new_deps - return ref, deps + return ref, Array(deps) def import_bom_dict( @@ -804,7 +1006,7 @@ def import_bom_dict( if not value: elements[i] = [] components, services, dependencies, vulnerabilities = elements - return other_data, components, services, dependencies, vulnerabilities # type: ignore + return other_data, Array(components), Array(services), Array(dependencies), Array(vulnerabilities) # type: ignore def import_csaf(options: "Options", original_data: Dict | None = None, document: FlatDicts | None = None, @@ -814,15 +1016,24 @@ def import_csaf(options: "Options", original_data: Dict | None = None, document: if document or product_tree or vex: logger.warning("Both source dict and parsed elements included. Using source dict.") return FlatDicts(original_data.get("document", {})), FlatDicts( - original_data.get("product_tree", {})), [ - CsafVulnerability(i, options) for i in original_data.get("vulnerabilities", [])] - return document or FlatDicts({}), product_tree or FlatDicts({}), vex or [] + original_data.get("product_tree", {})), Array([ + CsafVulnerability(i, options) for i in original_data.get("vulnerabilities", [])]) + else: + if vex: + if not isinstance(vex[0], CsafVulnerability): + vex = [CsafVulnerability(i, options) for i in vex] + vex = Array(vex) + if document and not isinstance(document, FlatDicts): + document = FlatDicts(document) # type: ignore + if product_tree and not isinstance(product_tree, FlatDicts): + product_tree = FlatDicts(product_tree) # type: ignore + return document or FlatDicts({}), product_tree or FlatDicts({}), vex or Array([]) def import_flat_dict(data: Dict | List[FlatElement]) -> List[FlatElement]: if not data: return [] - if data and isinstance(data, List) and isinstance(data[0], FlatElement): + if isinstance(data, List) and isinstance(data[0], FlatElement): return data if not isinstance(data, Dict): raise TypeError("data must be a dict or list of FlatElement")