Skip to content

Commit

Permalink
chg: [stix2 import] Better handling of the STIX2 Parser class arguments
Browse files Browse the repository at this point in the history
- Made the different arguments available with the
  command line feature part of the parsing method
  rather than setting them with the Parser init
  • Loading branch information
chrisr3d committed Aug 20, 2024
1 parent aba2a12 commit 8c7328c
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 80 deletions.
40 changes: 30 additions & 10 deletions misp_stix_converter/misp_stix_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,12 +687,12 @@ def stix_2_to_misp(filename: _files_type,
return {'errors': [f'{filename} - {error.__str__()}']}
parser, args = _get_stix2_parser(
_from_misp(bundle.objects), distribution, sharing_group_id,
title, producer, galaxies_as_tags, organisation_uuid,
cluster_distribution, cluster_sharing_group_id
title, producer, galaxies_as_tags, single_event,
organisation_uuid, cluster_distribution, cluster_sharing_group_id
)
stix_parser = parser(*args)
stix_parser = parser()
stix_parser.load_stix_bundle(bundle)
stix_parser.parse_stix_bundle(single_event)
stix_parser.parse_stix_bundle(**args)
if output_dir is None:
output_dir = filename.parent
if stix_parser.single_event:
Expand Down Expand Up @@ -731,12 +731,12 @@ def stix2_to_misp_instance(
return {'errors': [f'{filename} - {error.__str__()}']}
parser, args = _get_stix2_parser(
_from_misp(bundle.objects), distribution, sharing_group_id,
title, producer, galaxies_as_tags, organisation_uuid,
cluster_distribution, cluster_sharing_group_id
title, producer, galaxies_as_tags, single_event,
organisation_uuid, cluster_distribution, cluster_sharing_group_id
)
stix_parser = parser(*args)
stix_parser = parser()
stix_parser.load_stix_bundle(bundle)
stix_parser.parse_stix_bundle(single_event)
stix_parser.parse_stix_bundle(**args)
if stix_parser.single_event:
misp_event = misp.add_event(stix_parser.misp_event, pythonify=True)
if not isinstance(misp_event, MISPEvent):
Expand Down Expand Up @@ -773,9 +773,29 @@ def _from_misp(stix_objects):
return False


def _get_stix2_parser(from_misp: bool, *args: tuple) -> tuple:
def _get_stix2_parser(from_misp: bool, distribution: int,
sharing_group_id: Union[int, None],
title: Union[str, None], producer: Union[str, None],
galaxies_as_tags: bool, single_event: bool,
organisation_uuid: str, cluster_distribution: int,
cluster_sharing_group_id: Union[int, None]) -> tuple:
args = {
'distribution': distribution,
'galaxies_as_tags': galaxies_as_tags,
'producer': producer,
'sharing_group_id': sharing_group_id,
'single_event': single_event,
'title': title
}
if from_misp:
return InternalSTIX2toMISPParser, args[:-3]
return InternalSTIX2toMISPParser, args
args.update(
{
'cluster_distribution': cluster_distribution,
'cluster_sharing_group_id': cluster_sharing_group_id,
'organisation_uuid': organisation_uuid
}
)
return ExternalSTIX2toMISPParser, args


Expand Down
51 changes: 31 additions & 20 deletions misp_stix_converter/stix2misp/external_stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,8 @@


class ExternalSTIX2toMISPParser(STIX2toMISPParser):
def __init__(self, distribution: Optional[int] = 0,
sharing_group_id: Optional[int] = None,
title: Optional[str] = None,
producer: Optional[str] = None,
galaxies_as_tags: Optional[bool] = False,
organisation_uuid: Optional[str] = MISP_org_uuid,
cluster_distribution: Optional[int] = 0,
cluster_sharing_group_id: Optional[int] = None):
super().__init__(
distribution, sharing_group_id, title, producer, galaxies_as_tags
)
self._set_cluster_distribution(
self._sanitise_distribution(cluster_distribution),
self._sanitise_sharing_group_id(cluster_sharing_group_id)
)
self.__organisation_uuid = organisation_uuid
def __init__(self):
super().__init__()
self._mapping = ExternalSTIX2toMISPMapping
# parsers
self._attack_pattern_parser: ExternalSTIX2AttackPatternConverter
Expand All @@ -53,6 +39,21 @@ def __init__(self, distribution: Optional[int] = 0,
self._tool_parser: ExternalSTIX2ToolConverter
self._vulnerability_parser: ExternalSTIX2VulnerabilityConverter

def parse_stix_bundle(
self, cluster_distribution: Optional[int] = 0,
cluster_sharing_group_id: Optional[int] = None,
organisation_uuid: Optional[str] = MISP_org_uuid, **kwargs):
self._set_parameters(**kwargs)
self._set_cluster_distribution(
cluster_distribution, cluster_sharing_group_id
)
self.__organisation_uuid = organisation_uuid
self._parse_stix_bundle()

############################################################################
# PROPERTIES #
############################################################################

@property
def cluster_distribution(self) -> dict:
return self.__cluster_distribution
Expand All @@ -67,6 +68,10 @@ def observable_object_parser(self) -> STIX2ObservableObjectConverter:
def organisation_uuid(self) -> str:
return self.__organisation_uuid

############################################################################
# PARSER SETTERS #
############################################################################

def _set_attack_pattern_parser(self):
self._attack_pattern_parser = ExternalSTIX2AttackPatternConverter(self)

Expand All @@ -75,10 +80,16 @@ def _set_campaign_parser(self):

def _set_cluster_distribution(
self, distribution: int, sharing_group_id: Union[int, None]):
cluster_distribution = {'distribution': distribution}
if distribution == 4 and sharing_group_id is not None:
cluster_distribution['sharing_group_id'] = sharing_group_id
self.__cluster_distribution = cluster_distribution
cl_dis = {'distribution': self._sanitise_distribution(distribution)}
if distribution == 4:
if sharing_group_id is not None:
cl_dis['sharing_group_id'] = self._sanitise_sharing_group_id(
sharing_group_id
)
else:
cl_dis['distribution'] = 0
self._cluster_distribution_and_sharing_group_id_error()
self.__cluster_distribution = cl_dis

def _set_course_of_action_parser(self):
self._course_of_action_parser = ExternalSTIX2CourseOfActionConverter(self)
Expand Down
71 changes: 53 additions & 18 deletions misp_stix_converter/stix2misp/importparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
]
_DATA_PATH = Path(__file__).parents[1].resolve() / 'data'

_DEFAULT_DISTRIBUTION = 0

_VALID_DISTRIBUTIONS = (0, 1, 2, 3, 4)
_RFC_VERSIONS = (1, 3, 4, 5)
_UUIDv4 = UUID('76beed5f-7251-457e-8c2a-b45f7b589d3d')
Expand Down Expand Up @@ -71,29 +73,20 @@ def _load_json_file(path):


class STIXtoMISPParser(metaclass=ABCMeta):
def __init__(self, distribution: int, sharing_group_id: Union[int, None],
title: Union[str, None], producer: Union[str, None],
galaxies_as_tags: bool):
def __init__(self):
self._identifier: str
self.__distribution: int
self.__galaxies_as_tags: bool
self.__galaxy_feature: str
self.__producer: Union[str, None]
self.__relationship_types: dict
self.__sharing_group_id: Union[int, None]
self.__title: Union[str, None]

self._clusters: dict = {}
self._galaxies: dict = {}
self.__errors: defaultdict = defaultdict(set)
self.__warnings: defaultdict = defaultdict(set)
self.__distribution = self._sanitise_distribution(distribution)
self.__sharing_group_id = self._sanitise_sharing_group_id(
sharing_group_id
)
self.__title = title
self.__producer = producer
self.__galaxies_as_tags = self._sanitise_galaxies_as_tags(
galaxies_as_tags
)
if self.galaxies_as_tags:
self.__galaxy_feature = 'as_tag_names'
else:
self._galaxies: dict = {}
self.__galaxy_feature = 'as_container'
self.__replacement_uuids: dict = {}

def _sanitise_distribution(self, distribution: int) -> int:
Expand All @@ -107,7 +100,8 @@ def _sanitise_distribution(self, distribution: int) -> int:
self._distribution_value_error(sanitised)
return 0

def _sanitise_galaxies_as_tags(self, galaxies_as_tags: bool):
def _sanitise_galaxies_as_tags(
self, galaxies_as_tags: Union[bool, str, int]) -> bool:
if isinstance(galaxies_as_tags, bool):
return galaxies_as_tags
if galaxies_as_tags in ('true', 'True', '1', 1):
Expand All @@ -127,6 +121,32 @@ def _sanitise_sharing_group_id(
self._sharing_group_id_error(error)
return None

def _set_parameters(self, distribution: int = _DEFAULT_DISTRIBUTION,
sharing_group_id: Optional[int] = None,
galaxies_as_tags: Optional[bool] = False,
single_event: Optional[bool] = False,
producer: Optional[str] = None,
title: Optional[str] = None):
self.__distribution = self._sanitise_distribution(distribution)
self.__sharing_group_id = self._sanitise_sharing_group_id(
sharing_group_id
)
if self.sharing_group_id is None and self.distribution == 4:
self.__distribution = 0
self._distribution_and_sharing_group_id_error()
self.__galaxies_as_tags = self._sanitise_galaxies_as_tags(
galaxies_as_tags
)
self.__galaxy_feature = (
'as_tag_names' if self.galaxies_as_tags else 'as_container'
)
self.__single_event = single_event
self.__producer = producer
self.__title = title

def _set_single_event(self, single_event: bool):
self.__single_event = single_event

############################################################################
# PROPERTIES #
############################################################################
Expand Down Expand Up @@ -179,6 +199,10 @@ def replacement_uuids(self) -> dict:
def sharing_group_id(self) -> Union[int, None]:
return self.__sharing_group_id

@property
def single_event(self) -> bool:
return self.__single_event

@property
def synonyms_mapping(self) -> dict:
try:
Expand Down Expand Up @@ -208,6 +232,12 @@ def _attribute_from_pattern_parsing_error(self, indicator_id: str):
f'Error while parsing pattern from indicator with id {indicator_id}'
)

def _cluster_distribution_and_sharing_group_id_error(self):
self.__errors['init'].add(
'Invalid Cluster Sharing Group ID - '
'cannot be None when distribution is 4'
)

def _course_of_action_error(
self, course_of_action_id: str, exception: Exception):
self.__errors[self._identifier].add(
Expand All @@ -226,6 +256,11 @@ def _custom_object_error(self, custom_object_id: str, exception: Exception):
f'{custom_object_id}: {self._parse_traceback(exception)}'
)

def _distribution_and_sharing_group_id_error(self):
self.__errors['init'].add(
'Invalid Sharing Group ID - cannot be None when distribution is 4'
)

def _distribution_error(self, exception: Exception):
self.__errors['init'].add(
f'Wrong distribution format: {exception}'
Expand Down
22 changes: 14 additions & 8 deletions misp_stix_converter/stix2misp/internal_stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,8 @@


class InternalSTIX2toMISPParser(STIX2toMISPParser):
def __init__(self, distribution: Optional[int] = 0,
sharing_group_id: Optional[int] = None,
title: Optional[str] = None,
producer: Optional[str] = None,
galaxies_as_tags: Optional[bool] = False):
super().__init__(
distribution, sharing_group_id, title, producer, galaxies_as_tags
)
def __init__(self):
super().__init__()
self._mapping = InternalSTIX2toMISPMapping
# parsers
self._attack_pattern_parser: InternalSTIX2AttackPatternConverter
Expand All @@ -52,6 +46,14 @@ def __init__(self, distribution: Optional[int] = 0,
self._tool_parser: InternalSTIX2ToolConverter
self._vulnerability_parser: InternalSTIX2VulnerabilityConverter

def parse_stix_bundle(self, **kwargs):
self._set_parameters(**kwargs)
self._parse_stix_bundle()

############################################################################
# PROPERTIES #
############################################################################

@property
def custom_object_parser(self) -> STIX2CustomObjectConverter:
if not hasattr(self, '_custom_object_parser'):
Expand All @@ -70,6 +72,10 @@ def observed_data_parser(self) -> InternalSTIX2ObservedDataConverter:
self, '_observed_data_parser', self._set_observed_data_parser()
)

############################################################################
# PARSER SETTERS #
############################################################################

def _set_attack_pattern_parser(self) -> InternalSTIX2AttackPatternConverter:
self._attack_pattern_parser = InternalSTIX2AttackPatternConverter(self)

Expand Down
39 changes: 15 additions & 24 deletions misp_stix_converter/stix2misp/stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,8 @@


class STIX2toMISPParser(STIXtoMISPParser, metaclass=ABCMeta):
def __init__(self, distribution: int, sharing_group_id: Union[int, None],
title: Union[str, None], producer: Union[str, None],
galaxies_as_tags: bool):
super().__init__(
distribution, sharing_group_id, title, producer, galaxies_as_tags
)
def __init__(self):
super().__init__()
self._creators: set = set()
self._mapping: Union[
ExternalSTIX2toMISPMapping, InternalSTIX2toMISPMapping
Expand Down Expand Up @@ -259,8 +255,17 @@ def load_stix_bundle(self, bundle: Union[Bundle_v20, Bundle_v21]):
self._critical_error(exception)
self.__n_report = 2 if n_report >= 2 else n_report

def parse_stix_bundle(self, single_event: Optional[bool] = False):
self.__single_event = single_event
def parse_stix_content(
self, filename: str, single_event: Optional[bool] = False):
try:
bundle = _load_stix2_content(filename)
except Exception as exception:
sys.exit(exception)
self.load_stix_bundle(bundle)
del bundle
self.parse_stix_bundle(single_event)

def _parse_stix_bundle(self):
try:
feature = self._mapping.bundle_to_misp_mapping(str(self.__n_report))
except AttributeError:
Expand All @@ -279,16 +284,6 @@ def parse_stix_bundle(self, single_event: Optional[bool] = False):
if hasattr(self, feature):
setattr(self, feature, {})

def parse_stix_content(
self, filename: str, single_event: Optional[bool] = False):
try:
bundle = _load_stix2_content(filename)
except Exception as exception:
sys.exit(exception)
self.load_stix_bundle(bundle)
del bundle
self.parse_stix_bundle(single_event)

############################################################################
# PROPERTIES #
############################################################################
Expand Down Expand Up @@ -378,10 +373,6 @@ def observed_data_parser(self) -> _OBSERVED_DATA_PARSER_TYPING:
self._set_observed_data_parser()
return self._observed_data_parser

@property
def single_event(self) -> bool:
return self.__single_event

@property
def stix_version(self) -> str:
return self.__stix_version
Expand Down Expand Up @@ -685,13 +676,13 @@ def _parse_bundle_with_multiple_reports(self):
self.__misp_events.append(self.misp_event)

def _parse_bundle_with_no_report(self):
self.__single_event = True
self._set_single_event(True)
self.__misp_event = self._create_generic_event()
self._parse_loaded_features()
self._handle_unparsed_content()

def _parse_bundle_with_single_report(self):
self.__single_event = True
self._set_single_event(True)
if hasattr(self, '_report') and self._report is not None:
for report in self._report.values():
self.__misp_event = self._misp_event_from_report(report)
Expand Down

0 comments on commit 8c7328c

Please sign in to comment.