diff --git a/surfactant/cmd/config.py b/surfactant/cmd/config.py index 3e622a9e..30b38215 100644 --- a/surfactant/cmd/config.py +++ b/surfactant/cmd/config.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Any, List, Optional import click @@ -31,7 +31,7 @@ def config(key: str, values: Optional[List[str]]): else: # Set the configuration value # Convert 'true' and 'false' strings to boolean - converted_values = [] + converted_values: List[Any] = [] for value in values: if value.lower() == "true": converted_values.append(True) diff --git a/surfactant/cmd/generate.py b/surfactant/cmd/generate.py index 386eae9f..99ccf88c 100644 --- a/surfactant/cmd/generate.py +++ b/surfactant/cmd/generate.py @@ -7,7 +7,7 @@ import pathlib import queue import re -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import click from loguru import logger @@ -51,11 +51,11 @@ def get_software_entry( else: sw_entry.containerPath = [re.sub("^" + root_path, container_uuid + "/", filepath)] sw_entry.recordedInstitution = user_institution_name - sw_children = [] + sw_children: List[Software] = [] # for unsupported file types, details are just empty; this is the case for archive files (e.g. zip, tar, iso) # as well as intel hex or motorola s-rec files - extracted_info_results = pluginmanager.hook.extract_file_info( + extracted_info_results: List[object] = pluginmanager.hook.extract_file_info( sbom=parent_sbom, software=sw_entry, filename=filepath, @@ -66,8 +66,19 @@ def get_software_entry( ) # add metadata extracted from the file, and set SBOM fields if metadata has relevant info for file_details in extracted_info_results: + # None as details doesn't add any useful info... + if file_details is None: + continue + + # ensure metadata exists for the software entry + if sw_entry.metadata is None: + sw_entry.metadata = [] sw_entry.metadata.append(file_details) + # before checking for keys, make sure the file details object is a dictionary + if not isinstance(file_details, Dict): + continue + # common case is Windows PE file has these details under FileInfo, otherwise fallback default value is fine if "FileInfo" in file_details: fi = file_details["FileInfo"] @@ -90,6 +101,9 @@ def get_software_entry( if "revision_number" in file_details["ole"]: sw_entry.version = file_details["ole"]["revision_number"] if "author" in file_details["ole"]: + # ensure the vendor list has been created + if sw_entry.vendor is None: + sw_entry.vendor = [] sw_entry.vendor.append(file_details["ole"]["author"]) if "comments" in file_details["ole"]: sw_entry.comments = file_details["ole"]["comments"] @@ -149,9 +163,9 @@ def determine_install_prefix( Optional[str]: The install prefix to use, or 'NoneType' if an install path shouldn't be listed. """ install_prefix = None - if entry.installPrefix or entry.installPrefix == "": + if entry and (entry.installPrefix or entry.installPrefix == ""): install_prefix = entry.installPrefix - elif not skip_extract_path: + elif not skip_extract_path and extract_path is not None: # pathlib doesn't include the trailing slash epath = pathlib.Path(extract_path) if epath.is_file(): @@ -252,16 +266,16 @@ def get_default_from_config(option: str, fallback: Optional[Any] = None) -> Any: # Disable positional argument linter check -- could make keyword-only, but then defaults need to be set # pylint: disable-next=too-many-positional-arguments def sbom( - config_file, - sbom_outfile, - input_sbom, - skip_gather, - skip_relationships, - skip_install_path, - recorded_institution, - output_format, - input_format, - include_all_files, + config_file: str, + sbom_outfile: click.File, + input_sbom: click.File, + skip_gather: bool, + skip_relationships: bool, + skip_install_path: bool, + recorded_institution: str, + output_format: str, + input_format: str, + include_all_files: bool, ): """Generate a sbom configured in CONFIG_FILE and output to SBOM_OUTPUT. @@ -289,11 +303,13 @@ def sbom( if not validate_config(config): return - context = queue.Queue() + context: queue.Queue[ContextEntry] = queue.Queue() - for entry in config: - context.put(ContextEntry(**entry)) + for cfg_entry in config: + context.put(ContextEntry(**cfg_entry)) + # define the new_sbom variable type + new_sbom: SBOM if not input_sbom: new_sbom = SBOM() else: @@ -322,7 +338,11 @@ def sbom( user_institution_name=recorded_institution, ) archive_entry = new_sbom.find_software(parent_entry.sha256) - if Software.check_for_hash_collision(archive_entry, parent_entry): + if ( + archive_entry + and parent_entry + and Software.check_for_hash_collision(archive_entry, parent_entry) + ): logger.warning( f"Hash collision between {archive_entry.name} and {parent_entry.name}; unexpected results may occur" ) @@ -351,17 +371,20 @@ def sbom( ) entry.installPrefix = entry.installPrefix.replace("\\", "\\\\") - for epath in entry.extractPaths: + for epath_str in entry.extractPaths: # convert to pathlib.Path, ensures trailing "/" won't be present and some more consistent path formatting - epath = pathlib.Path(epath) + epath = pathlib.Path(epath_str) install_prefix = determine_install_prefix( entry, epath, skip_extract_path=skip_install_path ) logger.trace("Extracted Path: " + epath.as_posix()) + # variable used to track software entries to add to the SBOM + entries: List[Software] + # handle individual file case, since os.walk doesn't if epath.is_file(): - entries: List[Software] = [] + entries = [] filepath = epath.as_posix() # breakpoint() try: @@ -404,11 +427,11 @@ def sbom( ) dir_symlinks.append((install_source, install_dest)) - entries: List[Software] = [] - for f in files: + entries = [] + for file in files: # os.path.join will insert an OS specific separator between cdir and f # need to make sure that separator is a / and not a \ on windows - filepath = pathlib.Path(cdir, f).as_posix() + filepath = pathlib.Path(cdir, file).as_posix() # TODO: add CI tests for generating SBOMs in scenarios with symlinks... (and just generally more CI tests overall...) # Record symlink details but don't run info extractors on them if os.path.islink(filepath): @@ -488,6 +511,14 @@ def sbom( # Add symlinks to install paths and file names for software in new_sbom.software: + # ensure fileName, installPath, and metadata lists for the software entry have been created + # for a user supplied input SBOM, there are no guarantees + if software.fileName is None: + software.fileName = [] + if software.installPath is None: + software.installPath = [] + if software.metadata is None: + software.metadata = [] if software.sha256 in filename_symlinks: filename_symlinks_added = [] for filename in filename_symlinks[software.sha256]: @@ -511,6 +542,8 @@ def sbom( for software in new_sbom.software: # NOTE: this probably doesn't actually add any containerPath symlinks for paths in (software.containerPath, software.installPath): + if paths is None: + continue paths_to_add = [] for path in paths: for link_source, link_dest in dir_symlinks: @@ -521,10 +554,14 @@ def sbom( paths_to_add.append(path.replace(link_dest, link_source, 1)) if paths_to_add: found_md_installpathsymlinks = False - for md in software.metadata: - if "installPathSymlinks" in md: - found_md_installpathsymlinks = True - md["installPathSymlinks"] += paths_to_add + # make sure software.metadata list has been initialized + if software.metadata is None: + software.metadata = [] + if isinstance(software.metadata, Iterable): + for md in software.metadata: + if isinstance(md, Dict) and "installPathSymlinks" in md: + found_md_installpathsymlinks = True + md["installPathSymlinks"] += paths_to_add if not found_md_installpathsymlinks: software.metadata.append({"installPathSymlinks": paths_to_add}) paths += paths_to_add @@ -542,7 +579,7 @@ def sbom( def resolve_link( - path: str, cur_dir: str, extract_dir: str, install_prefix: str = None + path: str, cur_dir: str, extract_dir: str, install_prefix: Optional[str] = None ) -> Union[str, None]: assert cur_dir.startswith(extract_dir) # Links seen before diff --git a/surfactant/cmd/merge.py b/surfactant/cmd/merge.py index 5462c6d2..4d3c5a93 100644 --- a/surfactant/cmd/merge.py +++ b/surfactant/cmd/merge.py @@ -1,6 +1,7 @@ import json import uuid as uuid_module from collections import deque +from typing import Dict, List import click from loguru import logger @@ -83,7 +84,7 @@ def construct_relationship_graph(sbom: SBOM): sbom (SBOM): The sbom to generate relationship graph from. """ # construct a graph for adding a system relationship to all root software entries - rel_graph = {} + rel_graph: Dict[str, List[str]] = {} # add all UUIDs as nodes in the graph for system in sbom.systems: rel_graph[system.UUID] = [] diff --git a/surfactant/configmanager.py b/surfactant/configmanager.py index 3ad618ce..e532e5c2 100644 --- a/surfactant/configmanager.py +++ b/surfactant/configmanager.py @@ -2,7 +2,7 @@ import platform from pathlib import Path from threading import Lock -from typing import Any, Optional, Union +from typing import Any, Dict, Optional, Union import tomlkit @@ -19,7 +19,8 @@ class ConfigManager: config_file_path (Path): The path to the configuration file. """ - _instances = {} + _initialized: bool = False + _instances: Dict[str, "ConfigManager"] = {} _lock = Lock() def __new__( diff --git a/surfactant/infoextractors/elf_file.py b/surfactant/infoextractors/elf_file.py index 9e260693..250b2a05 100755 --- a/surfactant/infoextractors/elf_file.py +++ b/surfactant/infoextractors/elf_file.py @@ -52,7 +52,7 @@ def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: s } -def extract_elf_info(filename): +def extract_elf_info(filename: str) -> object: with open(filename, "rb") as f: try: elf = ELFFile(f) diff --git a/surfactant/infoextractors/java_file.py b/surfactant/infoextractors/java_file.py index 72e5a3e9..3b0cb41d 100644 --- a/surfactant/infoextractors/java_file.py +++ b/surfactant/infoextractors/java_file.py @@ -67,7 +67,7 @@ def handle_java_class(info: Dict[str, Any], class_info: javatools.JavaClassInfo) def extract_java_info(filename: str, filetype: str) -> object: - info = {"javaClasses": {}} + info: Dict[str, Any] = {"javaClasses": {}} if filetype in ("JAR", "EAR", "WAR"): with javatools.jarinfo.JarInfo(filename) as jarinfo: for class_ in jarinfo.get_classes(): diff --git a/surfactant/infoextractors/js_file.py b/surfactant/infoextractors/js_file.py index 1036ad77..780ef282 100644 --- a/surfactant/infoextractors/js_file.py +++ b/surfactant/infoextractors/js_file.py @@ -24,7 +24,7 @@ def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: s return extract_js_info(filename) -def extract_js_info(filename): +def extract_js_info(filename: str) -> object: js_info: Dict[str, Any] = {"jsLibraries": []} js_lib_file = pathlib.Path(__file__).parent / "js_library_patterns.json" diff --git a/surfactant/infoextractors/mach_o_file.py b/surfactant/infoextractors/mach_o_file.py index 531efdce..65a0738d 100644 --- a/surfactant/infoextractors/mach_o_file.py +++ b/surfactant/infoextractors/mach_o_file.py @@ -47,7 +47,7 @@ def extract_mach_o_info(filename: str) -> object: except OSError: return {} - file_details: Dict[str:Any] = {"OS": "MacOS", "numBinaries": binaries.size, "binaries": []} + file_details: Dict[str, Any] = {"OS": "MacOS", "numBinaries": binaries.size, "binaries": []} # Iterate over all binaries in the FAT binary for binary in binaries: diff --git a/surfactant/infoextractors/ole_file.py b/surfactant/infoextractors/ole_file.py index 9af4cc4b..3989a90f 100755 --- a/surfactant/infoextractors/ole_file.py +++ b/surfactant/infoextractors/ole_file.py @@ -21,7 +21,7 @@ def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: s return extract_ole_info(filename) -def extract_ole_info(filename): +def extract_ole_info(filename: str) -> object: file_details: Dict[str, Any] = {} ole = olefile.OleFileIO(filename) diff --git a/surfactant/infoextractors/pe_file.py b/surfactant/infoextractors/pe_file.py index 4d908203..7e7dabc8 100755 --- a/surfactant/infoextractors/pe_file.py +++ b/surfactant/infoextractors/pe_file.py @@ -10,7 +10,7 @@ import pathlib import re -from typing import Any, Dict +from typing import Any, Dict, List, Optional import defusedxml.ElementTree import dnfile @@ -79,8 +79,7 @@ def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: s } -def extract_pe_info(filename): - dnfile.fast_load = False +def extract_pe_info(filename: str) -> object: try: pe = dnfile.dnPE(filename, fast_load=False) except (OSError, dnfile.PEFormatError): @@ -169,7 +168,7 @@ def extract_pe_info(filename): assembly_refs.append(get_assemblyref_info(ar_info)) file_details["dotnetAssemblyRef"] = assembly_refs if implmap_info := getattr(dnet_mdtables, "ImplMap", None): - imp_modules = [] + imp_modules: List[Dict[str, Any]] = [] for im_info in implmap_info: insert_implmap_info(im_info, imp_modules) file_details["dotnetImplMap"] = imp_modules @@ -190,7 +189,7 @@ def extract_pe_info(filename): return file_details -def add_core_assembly_info(asm_dict, asm_info): +def add_core_assembly_info(asm_dict: Dict[str, Any], asm_info): # REFERENCE: https://github.com/malwarefrank/dnfile/blob/096de1b3/src/dnfile/stream.py#L36-L39 # HeapItemString value will be decoded string, or None if there was a UnicodeDecodeError asm_dict["Name"] = asm_info.Name.value if asm_info.Name.value else asm_info.raw_data.hex() @@ -233,7 +232,7 @@ def add_assembly_flags_info(asm_dict, asm_info): } -def get_assembly_info(asm_info): +def get_assembly_info(asm_info) -> Dict[str, Any]: asm: Dict[str, Any] = {} add_core_assembly_info(asm, asm_info) # REFERENCE: https://github.com/malwarefrank/dnfile/blob/fcccdaf/src/dnfile/enums.py#L851-L863 @@ -243,7 +242,7 @@ def get_assembly_info(asm_info): return asm -def get_assemblyref_info(asmref_info): +def get_assemblyref_info(asmref_info) -> Dict[str, Any]: asmref: Dict[str, Any] = {} add_core_assembly_info(asmref, asmref_info) # REFERENCE: https://github.com/malwarefrank/dnfile/blob/096de1b3/src/dnfile/stream.py#L62-L66 @@ -254,7 +253,7 @@ def get_assemblyref_info(asmref_info): return asmref -def insert_implmap_info(im_info, imp_modules): +def insert_implmap_info(im_info, imp_modules: List[Dict[str, Any]]): # REFERENCE: https://github.com/malwarefrank/dnfile/blob/096de1b3/src/dnfile/stream.py#L36-L39 # HeapItemString value will be decoded string, or None if there was a UnicodeDecodeError dllName = ( @@ -284,7 +283,7 @@ def get_xmlns_and_tag(uri): # check for manifest file on Windows (note: could also be a resource contained within an exe/dll) # return any info that could be useful for establishing "Uses" relationships later -def get_windows_manifest_info(filename): +def get_windows_manifest_info(filename: str) -> Optional[Dict[str, Any]]: binary_filepath = pathlib.Path(filename) manifest_filepath = binary_filepath.with_suffix(binary_filepath.suffix + ".manifest") if manifest_filepath.exists(): @@ -428,7 +427,7 @@ def get_assemblyBinding_info(ab_et, config_filepath=""): # DLL redirection summary: redirection file with name_of_exe.local (contents are ignored) makes a check for mydll.dll happen in the application directory first, # regardless of what the full path specified for LoadLibrary or LoadLibraryEx is (if no dll found in local directory, uses the typical search order) -def check_windows_dll_redirection_local(filename): +def check_windows_dll_redirection_local(filename: str): binary_filepath = pathlib.Path(filename) config_filepath = binary_filepath.with_suffix(binary_filepath.suffix + ".local") return config_filepath.exists() @@ -437,7 +436,7 @@ def check_windows_dll_redirection_local(filename): # check for an application configuration file and return (potentially) useful information # https://learn.microsoft.com/en-us/dotnet/framework/deployment/how-the-runtime-locates-assemblies#application-configuration-file # https://learn.microsoft.com/en-us/windows/win32/sbscs/application-configuration-files -def get_windows_application_config_info(filename): +def get_windows_application_config_info(filename: str): binary_filepath = pathlib.Path(filename) config_filepath = binary_filepath.with_suffix(binary_filepath.suffix + ".config") if config_filepath.exists(): diff --git a/surfactant/plugin/hookspecs.py b/surfactant/plugin/hookspecs.py index eb1fe304..84bf2fe4 100644 --- a/surfactant/plugin/hookspecs.py +++ b/surfactant/plugin/hookspecs.py @@ -38,7 +38,7 @@ def extract_file_info( context: "Queue[ContextEntry]", children: List[Software], include_all_files: bool, -) -> Optional[list]: +) -> object: """Extracts information from the given file to add to the given software entry. Return an object to be included as part of the metadata field, and potentially used as part of selecting default values for other Software entry fields. Returning `None` will not add @@ -92,6 +92,7 @@ def write_sbom(sbom: SBOM, outfile) -> None: @hookspec +# type: ignore[empty-body] def read_sbom(infile) -> SBOM: """Reads the contents of the input SBOM from the given input SBOM file. diff --git a/surfactant/plugin/manager.py b/surfactant/plugin/manager.py index ea408320..e1b75eda 100644 --- a/surfactant/plugin/manager.py +++ b/surfactant/plugin/manager.py @@ -80,8 +80,9 @@ def print_plugins(pm: pluggy.PluginManager): print("PLUGINS") for p in pm.get_plugins(): print("-------") - print("canonical name: " + pm.get_canonical_name(p)) - print("name: " + pm.get_name(p)) + print(f"canonical name: {pm.get_canonical_name(p)}") + plugin_name = pm.get_name(p) if pm.get_name(p) else "" + print(f"name: {plugin_name}") def find_io_plugin(pm: pluggy.PluginManager, io_format: str, function_name: str): diff --git a/surfactant/relationships/dotnet_relationship.py b/surfactant/relationships/dotnet_relationship.py index e313daed..40827320 100644 --- a/surfactant/relationships/dotnet_relationship.py +++ b/surfactant/relationships/dotnet_relationship.py @@ -95,7 +95,7 @@ def establish_relationships( if isinstance(e.installPath, Iterable): for ifile in e.installPath: if ref_abspath == pathlib.PureWindowsPath(ifile): - relationships.extend(Relationship(dependent_uuid, e.uuid, "Uses")) + relationships.append(Relationship(dependent_uuid, e.UUID, "Uses")) continue probedirs = [] diff --git a/surfactant/relationships/elf_relationship.py b/surfactant/relationships/elf_relationship.py index 919ad66f..5563574d 100644 --- a/surfactant/relationships/elf_relationship.py +++ b/surfactant/relationships/elf_relationship.py @@ -165,22 +165,22 @@ def substitute_all_dst(sw: Software, md, path) -> List[pathlib.PurePosixPath]: if isinstance(sw.installPath, Iterable): for ipath in sw.installPath: origin = pathlib.PurePosixPath(ipath).parent.as_posix() - pathlist.append(replace_dst(path, "ORIGIN", origin)) + pathlist.append(pathlib.PurePosixPath(replace_dst(path, "ORIGIN", origin))) # LIB: expands to `lib` or `lib64` depending on arch (x86-64 to lib64, x86-32 to lib) if (path.find("$LIB") != -1) or (path.find("${LIB}") != -1): if not pathlist: # nothing in the original pathlist, use the original path passed in - pathlist.append(replace_dst(path, "LIB", "lib")) - pathlist.append(replace_dst(path, "LIB", "lib64")) + pathlist.append(pathlib.PurePosixPath(replace_dst(path, "LIB", "lib"))) + pathlist.append(pathlib.PurePosixPath(replace_dst(path, "LIB", "lib64"))) else: # perform substitutions with every current entry in pathlist pathlist = [ newp for p in pathlist for newp in ( - replace_dst(p, "LIB", "lib"), - replace_dst(p, "LIB", "lib64"), + pathlib.PurePosixPath(replace_dst(p, "LIB", "lib")), + pathlib.PurePosixPath(replace_dst(p, "LIB", "lib64")), ) ] @@ -194,5 +194,5 @@ def substitute_all_dst(sw: Software, md, path) -> List[pathlib.PurePosixPath]: return [] # normalize paths after expanding tokens to avoid portions of the path involving ../, ./, and // occurrences - pathlist = [posix_normpath(p) for p in pathlist] + pathlist = [posix_normpath(p.as_posix()) for p in pathlist] return pathlist diff --git a/surfactant/relationships/java_relationship.py b/surfactant/relationships/java_relationship.py index 46e42153..46b4065d 100644 --- a/surfactant/relationships/java_relationship.py +++ b/surfactant/relationships/java_relationship.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Dict, List, Optional import surfactant.plugin from surfactant.sbomtypes import SBOM, Relationship, Software @@ -10,18 +10,19 @@ def has_required_fields(metadata) -> bool: class _ExportDict: created = False - supplied_by = {} + supplied_by: Dict[str, str] = {} @classmethod def create_export_dict(cls, sbom: SBOM): if cls.created: return for software_entry in sbom.software: - for metadata in software_entry.metadata: - if "javaClasses" in metadata: - for class_info in metadata["javaClasses"].values(): - for export in class_info["javaExports"]: - cls.supplied_by[export] = software_entry.UUID + if software_entry.metadata: + for metadata in software_entry.metadata: + if isinstance(metadata, Dict) and "javaClasses" in metadata: + for class_info in metadata["javaClasses"].values(): + for export in class_info["javaExports"]: + cls.supplied_by[export] = software_entry.UUID cls.created = True @classmethod diff --git a/surfactant/sbomtypes/_analysisdata.py b/surfactant/sbomtypes/_analysisdata.py index 0eac2e3b..90506d6d 100644 --- a/surfactant/sbomtypes/_analysisdata.py +++ b/surfactant/sbomtypes/_analysisdata.py @@ -14,10 +14,12 @@ @dataclass class AnalysisData: + # origin, testName, and testVersion are not optional + # user must provide the info, not really a "safe" default value + origin: str + testName: str + testVersion: str UUID: str = field(default_factory=lambda: str(uuid.uuid4())) - origin: str = None - testName: str = None - testVersion: str = None specificEnvironment: Optional[str] = None files: Optional[List[File]] = None linksToKnownVulnerabilities: Optional[str] = None diff --git a/surfactant/sbomtypes/_sbom.py b/surfactant/sbomtypes/_sbom.py index a4febce6..114cbc7d 100644 --- a/surfactant/sbomtypes/_sbom.py +++ b/surfactant/sbomtypes/_sbom.py @@ -56,7 +56,10 @@ def find_relationship(self, xUUID: str, yUUID: str, relationship: str) -> bool: return Relationship(xUUID, yUUID, relationship) in self.relationships def has_relationship( - self, xUUID: str = None, yUUID: str = None, relationship: str = None + self, + xUUID: Optional[str] = None, + yUUID: Optional[str] = None, + relationship: Optional[str] = None, ) -> bool: for rel in self.relationships: # We iterate until we find a relationship that meets all the conditions @@ -93,7 +96,7 @@ def add_software_entries( # if a software entry already exists with a matching file hash, augment the info in the existing entry for e in entries: existing_sw = self.find_software(e.sha256) - if Software.check_for_hash_collision(existing_sw, e): + if existing_sw and Software.check_for_hash_collision(existing_sw, e): logger.warning( f"Hash collision between {existing_sw.name} and {e.name}; unexpected results may occur" ) @@ -167,8 +170,7 @@ def create_software( self.software.append(sw) return sw - def merge(self, sbom_m: SBOM) -> SBOM: - # merged_sbom = SBOM() + def merge(self, sbom_m: SBOM): # merged/old to new UUID map uuid_updates = {} @@ -237,20 +239,20 @@ def merge(self, sbom_m: SBOM) -> SBOM: self.observations.append(observation) # merge starRelationships if sbom_m.starRelationships: - for rel in sbom_m.starRelationships: + for star_rel in sbom_m.starRelationships: # rewrite UUIDs before doing the search - if rel.xUUID in uuid_updates: - rel.xUUID = uuid_updates[rel.xUUID] - if rel.yUUID in uuid_updates: - rel.yUUID = uuid_updates[rel.yUUID] - if existing_rel := self._find_star_relationship_entry( - xUUID=rel.xUUID, - yUUID=rel.yUUID, - relationship=rel.relationship, + if star_rel.xUUID in uuid_updates: + star_rel.xUUID = uuid_updates[star_rel.xUUID] + if star_rel.yUUID in uuid_updates: + star_rel.yUUID = uuid_updates[star_rel.yUUID] + if existing_star_rel := self._find_star_relationship_entry( + xUUID=star_rel.xUUID, + yUUID=star_rel.yUUID, + relationship=star_rel.relationship, ): - logger.info(f"DUPLICATE STAR RELATIONSHIP: {existing_rel}") + logger.info(f"DUPLICATE STAR RELATIONSHIP: {existing_star_rel}") else: - self.starRelationships.add(rel) + self.starRelationships.add(star_rel) def _find_systems_entry( self, uuid: Optional[str] = None, name: Optional[str] = None