diff --git a/misp_stix_converter/misp_stix_converter.py b/misp_stix_converter/misp_stix_converter.py index 08918c7..3f0fb60 100644 --- a/misp_stix_converter/misp_stix_converter.py +++ b/misp_stix_converter/misp_stix_converter.py @@ -687,12 +687,12 @@ def stix_2_to_misp(filename: _files_type, return {'errors': [f'{filename} - {error.__str__()}']} parser, args = _get_stix2_parser( _from_misp(bundle.objects), distribution, sharing_group_id, - title, producer, galaxies_as_tags, organisation_uuid, - cluster_distribution, cluster_sharing_group_id + title, producer, galaxies_as_tags, single_event, + organisation_uuid, cluster_distribution, cluster_sharing_group_id ) - stix_parser = parser(*args) + stix_parser = parser() stix_parser.load_stix_bundle(bundle) - stix_parser.parse_stix_bundle(single_event) + stix_parser.parse_stix_bundle(**args) if output_dir is None: output_dir = filename.parent if stix_parser.single_event: @@ -731,12 +731,12 @@ def stix2_to_misp_instance( return {'errors': [f'{filename} - {error.__str__()}']} parser, args = _get_stix2_parser( _from_misp(bundle.objects), distribution, sharing_group_id, - title, producer, galaxies_as_tags, organisation_uuid, - cluster_distribution, cluster_sharing_group_id + title, producer, galaxies_as_tags, single_event, + organisation_uuid, cluster_distribution, cluster_sharing_group_id ) - stix_parser = parser(*args) + stix_parser = parser() stix_parser.load_stix_bundle(bundle) - stix_parser.parse_stix_bundle(single_event) + stix_parser.parse_stix_bundle(**args) if stix_parser.single_event: misp_event = misp.add_event(stix_parser.misp_event, pythonify=True) if not isinstance(misp_event, MISPEvent): @@ -773,9 +773,29 @@ def _from_misp(stix_objects): return False -def _get_stix2_parser(from_misp: bool, *args: tuple) -> tuple: +def _get_stix2_parser(from_misp: bool, distribution: int, + sharing_group_id: Union[int, None], + title: Union[str, None], producer: Union[str, None], + galaxies_as_tags: bool, single_event: bool, + organisation_uuid: str, cluster_distribution: int, + cluster_sharing_group_id: Union[int, None]) -> tuple: + args = { + 'distribution': distribution, + 'galaxies_as_tags': galaxies_as_tags, + 'producer': producer, + 'sharing_group_id': sharing_group_id, + 'single_event': single_event, + 'title': title + } if from_misp: - return InternalSTIX2toMISPParser, args[:-3] + return InternalSTIX2toMISPParser, args + args.update( + { + 'cluster_distribution': cluster_distribution, + 'cluster_sharing_group_id': cluster_sharing_group_id, + 'organisation_uuid': organisation_uuid + } + ) return ExternalSTIX2toMISPParser, args diff --git a/misp_stix_converter/stix2misp/external_stix2_to_misp.py b/misp_stix_converter/stix2misp/external_stix2_to_misp.py index 7d238d8..997a96d 100644 --- a/misp_stix_converter/stix2misp/external_stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/external_stix2_to_misp.py @@ -20,22 +20,8 @@ class ExternalSTIX2toMISPParser(STIX2toMISPParser): - def __init__(self, distribution: Optional[int] = 0, - sharing_group_id: Optional[int] = None, - title: Optional[str] = None, - producer: Optional[str] = None, - galaxies_as_tags: Optional[bool] = False, - organisation_uuid: Optional[str] = MISP_org_uuid, - cluster_distribution: Optional[int] = 0, - cluster_sharing_group_id: Optional[int] = None): - super().__init__( - distribution, sharing_group_id, title, producer, galaxies_as_tags - ) - self._set_cluster_distribution( - self._sanitise_distribution(cluster_distribution), - self._sanitise_sharing_group_id(cluster_sharing_group_id) - ) - self.__organisation_uuid = organisation_uuid + def __init__(self): + super().__init__() self._mapping = ExternalSTIX2toMISPMapping # parsers self._attack_pattern_parser: ExternalSTIX2AttackPatternConverter @@ -53,6 +39,21 @@ def __init__(self, distribution: Optional[int] = 0, self._tool_parser: ExternalSTIX2ToolConverter self._vulnerability_parser: ExternalSTIX2VulnerabilityConverter + def parse_stix_bundle( + self, cluster_distribution: Optional[int] = 0, + cluster_sharing_group_id: Optional[int] = None, + organisation_uuid: Optional[str] = MISP_org_uuid, **kwargs): + self._set_parameters(**kwargs) + self._set_cluster_distribution( + cluster_distribution, cluster_sharing_group_id + ) + self.__organisation_uuid = organisation_uuid + self._parse_stix_bundle() + + ############################################################################ + # PROPERTIES # + ############################################################################ + @property def cluster_distribution(self) -> dict: return self.__cluster_distribution @@ -67,6 +68,10 @@ def observable_object_parser(self) -> STIX2ObservableObjectConverter: def organisation_uuid(self) -> str: return self.__organisation_uuid + ############################################################################ + # PARSER SETTERS # + ############################################################################ + def _set_attack_pattern_parser(self): self._attack_pattern_parser = ExternalSTIX2AttackPatternConverter(self) @@ -75,10 +80,16 @@ def _set_campaign_parser(self): def _set_cluster_distribution( self, distribution: int, sharing_group_id: Union[int, None]): - cluster_distribution = {'distribution': distribution} - if distribution == 4 and sharing_group_id is not None: - cluster_distribution['sharing_group_id'] = sharing_group_id - self.__cluster_distribution = cluster_distribution + cl_dis = {'distribution': self._sanitise_distribution(distribution)} + if distribution == 4: + if sharing_group_id is not None: + cl_dis['sharing_group_id'] = self._sanitise_sharing_group_id( + sharing_group_id + ) + else: + cl_dis['distribution'] = 0 + self._cluster_distribution_and_sharing_group_id_error() + self.__cluster_distribution = cl_dis def _set_course_of_action_parser(self): self._course_of_action_parser = ExternalSTIX2CourseOfActionConverter(self) diff --git a/misp_stix_converter/stix2misp/importparser.py b/misp_stix_converter/stix2misp/importparser.py index 31ddaf6..27e8199 100644 --- a/misp_stix_converter/stix2misp/importparser.py +++ b/misp_stix_converter/stix2misp/importparser.py @@ -23,6 +23,8 @@ ] _DATA_PATH = Path(__file__).parents[1].resolve() / 'data' +_DEFAULT_DISTRIBUTION = 0 + _VALID_DISTRIBUTIONS = (0, 1, 2, 3, 4) _RFC_VERSIONS = (1, 3, 4, 5) _UUIDv4 = UUID('76beed5f-7251-457e-8c2a-b45f7b589d3d') @@ -71,29 +73,20 @@ def _load_json_file(path): class STIXtoMISPParser(metaclass=ABCMeta): - def __init__(self, distribution: int, sharing_group_id: Union[int, None], - title: Union[str, None], producer: Union[str, None], - galaxies_as_tags: bool): + def __init__(self): self._identifier: str + self.__distribution: int + self.__galaxies_as_tags: bool + self.__galaxy_feature: str + self.__producer: Union[str, None] self.__relationship_types: dict + self.__sharing_group_id: Union[int, None] + self.__title: Union[str, None] self._clusters: dict = {} + self._galaxies: dict = {} self.__errors: defaultdict = defaultdict(set) self.__warnings: defaultdict = defaultdict(set) - self.__distribution = self._sanitise_distribution(distribution) - self.__sharing_group_id = self._sanitise_sharing_group_id( - sharing_group_id - ) - self.__title = title - self.__producer = producer - self.__galaxies_as_tags = self._sanitise_galaxies_as_tags( - galaxies_as_tags - ) - if self.galaxies_as_tags: - self.__galaxy_feature = 'as_tag_names' - else: - self._galaxies: dict = {} - self.__galaxy_feature = 'as_container' self.__replacement_uuids: dict = {} def _sanitise_distribution(self, distribution: int) -> int: @@ -107,7 +100,8 @@ def _sanitise_distribution(self, distribution: int) -> int: self._distribution_value_error(sanitised) return 0 - def _sanitise_galaxies_as_tags(self, galaxies_as_tags: bool): + def _sanitise_galaxies_as_tags( + self, galaxies_as_tags: Union[bool, str, int]) -> bool: if isinstance(galaxies_as_tags, bool): return galaxies_as_tags if galaxies_as_tags in ('true', 'True', '1', 1): @@ -127,6 +121,32 @@ def _sanitise_sharing_group_id( self._sharing_group_id_error(error) return None + def _set_parameters(self, distribution: int = _DEFAULT_DISTRIBUTION, + sharing_group_id: Optional[int] = None, + galaxies_as_tags: Optional[bool] = False, + single_event: Optional[bool] = False, + producer: Optional[str] = None, + title: Optional[str] = None): + self.__distribution = self._sanitise_distribution(distribution) + self.__sharing_group_id = self._sanitise_sharing_group_id( + sharing_group_id + ) + if self.sharing_group_id is None and self.distribution == 4: + self.__distribution = 0 + self._distribution_and_sharing_group_id_error() + self.__galaxies_as_tags = self._sanitise_galaxies_as_tags( + galaxies_as_tags + ) + self.__galaxy_feature = ( + 'as_tag_names' if self.galaxies_as_tags else 'as_container' + ) + self.__single_event = single_event + self.__producer = producer + self.__title = title + + def _set_single_event(self, single_event: bool): + self.__single_event = single_event + ############################################################################ # PROPERTIES # ############################################################################ @@ -179,6 +199,10 @@ def replacement_uuids(self) -> dict: def sharing_group_id(self) -> Union[int, None]: return self.__sharing_group_id + @property + def single_event(self) -> bool: + return self.__single_event + @property def synonyms_mapping(self) -> dict: try: @@ -208,6 +232,12 @@ def _attribute_from_pattern_parsing_error(self, indicator_id: str): f'Error while parsing pattern from indicator with id {indicator_id}' ) + def _cluster_distribution_and_sharing_group_id_error(self): + self.__errors['init'].add( + 'Invalid Cluster Sharing Group ID - ' + 'cannot be None when distribution is 4' + ) + def _course_of_action_error( self, course_of_action_id: str, exception: Exception): self.__errors[self._identifier].add( @@ -226,6 +256,11 @@ def _custom_object_error(self, custom_object_id: str, exception: Exception): f'{custom_object_id}: {self._parse_traceback(exception)}' ) + def _distribution_and_sharing_group_id_error(self): + self.__errors['init'].add( + 'Invalid Sharing Group ID - cannot be None when distribution is 4' + ) + def _distribution_error(self, exception: Exception): self.__errors['init'].add( f'Wrong distribution format: {exception}' diff --git a/misp_stix_converter/stix2misp/internal_stix2_to_misp.py b/misp_stix_converter/stix2misp/internal_stix2_to_misp.py index 0fb8868..4b921c0 100644 --- a/misp_stix_converter/stix2misp/internal_stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/internal_stix2_to_misp.py @@ -26,14 +26,8 @@ class InternalSTIX2toMISPParser(STIX2toMISPParser): - def __init__(self, distribution: Optional[int] = 0, - sharing_group_id: Optional[int] = None, - title: Optional[str] = None, - producer: Optional[str] = None, - galaxies_as_tags: Optional[bool] = False): - super().__init__( - distribution, sharing_group_id, title, producer, galaxies_as_tags - ) + def __init__(self): + super().__init__() self._mapping = InternalSTIX2toMISPMapping # parsers self._attack_pattern_parser: InternalSTIX2AttackPatternConverter @@ -52,6 +46,14 @@ def __init__(self, distribution: Optional[int] = 0, self._tool_parser: InternalSTIX2ToolConverter self._vulnerability_parser: InternalSTIX2VulnerabilityConverter + def parse_stix_bundle(self, **kwargs): + self._set_parameters(**kwargs) + self._parse_stix_bundle() + + ############################################################################ + # PROPERTIES # + ############################################################################ + @property def custom_object_parser(self) -> STIX2CustomObjectConverter: if not hasattr(self, '_custom_object_parser'): @@ -70,6 +72,10 @@ def observed_data_parser(self) -> InternalSTIX2ObservedDataConverter: self, '_observed_data_parser', self._set_observed_data_parser() ) + ############################################################################ + # PARSER SETTERS # + ############################################################################ + def _set_attack_pattern_parser(self) -> InternalSTIX2AttackPatternConverter: self._attack_pattern_parser = InternalSTIX2AttackPatternConverter(self) diff --git a/misp_stix_converter/stix2misp/stix2_to_misp.py b/misp_stix_converter/stix2misp/stix2_to_misp.py index 80bf1c3..4117db0 100644 --- a/misp_stix_converter/stix2misp/stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/stix2_to_misp.py @@ -202,12 +202,8 @@ class STIX2toMISPParser(STIXtoMISPParser, metaclass=ABCMeta): - def __init__(self, distribution: int, sharing_group_id: Union[int, None], - title: Union[str, None], producer: Union[str, None], - galaxies_as_tags: bool): - super().__init__( - distribution, sharing_group_id, title, producer, galaxies_as_tags - ) + def __init__(self): + super().__init__() self._creators: set = set() self._mapping: Union[ ExternalSTIX2toMISPMapping, InternalSTIX2toMISPMapping @@ -259,8 +255,17 @@ def load_stix_bundle(self, bundle: Union[Bundle_v20, Bundle_v21]): self._critical_error(exception) self.__n_report = 2 if n_report >= 2 else n_report - def parse_stix_bundle(self, single_event: Optional[bool] = False): - self.__single_event = single_event + def parse_stix_content( + self, filename: str, single_event: Optional[bool] = False): + try: + bundle = _load_stix2_content(filename) + except Exception as exception: + sys.exit(exception) + self.load_stix_bundle(bundle) + del bundle + self.parse_stix_bundle(single_event) + + def _parse_stix_bundle(self): try: feature = self._mapping.bundle_to_misp_mapping(str(self.__n_report)) except AttributeError: @@ -279,16 +284,6 @@ def parse_stix_bundle(self, single_event: Optional[bool] = False): if hasattr(self, feature): setattr(self, feature, {}) - def parse_stix_content( - self, filename: str, single_event: Optional[bool] = False): - try: - bundle = _load_stix2_content(filename) - except Exception as exception: - sys.exit(exception) - self.load_stix_bundle(bundle) - del bundle - self.parse_stix_bundle(single_event) - ############################################################################ # PROPERTIES # ############################################################################ @@ -378,10 +373,6 @@ def observed_data_parser(self) -> _OBSERVED_DATA_PARSER_TYPING: self._set_observed_data_parser() return self._observed_data_parser - @property - def single_event(self) -> bool: - return self.__single_event - @property def stix_version(self) -> str: return self.__stix_version @@ -685,13 +676,13 @@ def _parse_bundle_with_multiple_reports(self): self.__misp_events.append(self.misp_event) def _parse_bundle_with_no_report(self): - self.__single_event = True + self._set_single_event(True) self.__misp_event = self._create_generic_event() self._parse_loaded_features() self._handle_unparsed_content() def _parse_bundle_with_single_report(self): - self.__single_event = True + self._set_single_event(True) if hasattr(self, '_report') and self._report is not None: for report in self._report.values(): self.__misp_event = self._misp_event_from_report(report)