Skip to content

Commit

Permalink
[client] Support no_trigger_import in x_opencti_files
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelHassine committed Sep 19, 2023
1 parent 1b66a21 commit cf6b01d
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
autoapi==2.0.1
sphinx==7.2.5
sphinx==7.2.6
sphinx-autodoc-typehints==1.24.0
sphinx_rtd_theme==1.3.0
2 changes: 2 additions & 0 deletions pycti/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
OpenCTIConnectorHelper,
get_config_variable,
)
from .connector.opencti_metric_handler import OpenCTIMetricHandler
from .entities.opencti_attack_pattern import AttackPattern
from .entities.opencti_campaign import Campaign
from .entities.opencti_case_incident import CaseIncident
Expand Down Expand Up @@ -97,6 +98,7 @@
"OpenCTIApiWork",
"OpenCTIConnector",
"OpenCTIConnectorHelper",
"OpenCTIMetricHandler",
"OpenCTIStix2",
"OpenCTIStix2Splitter",
"OpenCTIStix2Update",
Expand Down
25 changes: 23 additions & 2 deletions pycti/connector/opencti_connector_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pycti.api.opencti_api_client import OpenCTIApiClient
from pycti.connector import LOGGER
from pycti.connector.opencti_connector import OpenCTIConnector
from pycti.connector.opencti_metric_handler import OpenCTIMetricHandler
from pycti.utils.opencti_stix2_splitter import OpenCTIStix2Splitter

TRUTHY: List[str] = ["yes", "true", "True"]
Expand Down Expand Up @@ -56,6 +57,7 @@ def get_config_variable(
:param yaml_path: path to yaml config
:param config: client config dict, defaults to {}
:param isNumber: specify if the variable is a number, defaults to False
:param default: default value
"""

if os.getenv(env_var) is not None:
Expand Down Expand Up @@ -267,10 +269,12 @@ def _data_handler(self, json_data) -> None:
)
return message
except Exception as e: # pylint: disable=broad-except
self.helper.metric.inc("error_count")
LOGGER.exception("Error in message processing, reporting error to API")
try:
self.helper.api.work.to_processed(work_id, str(e), True)
except: # pylint: disable=bare-except
self.helper.metric.inc("error_count")
LOGGER.error("Failing reporting the processing")

def run(self) -> None:
Expand Down Expand Up @@ -337,14 +341,15 @@ def on_channel_open(self, channel):


class PingAlive(threading.Thread):
def __init__(self, connector_id, api, get_state, set_state) -> None:
def __init__(self, connector_id, api, get_state, set_state, metric) -> None:
threading.Thread.__init__(self)
self.connector_id = connector_id
self.in_error = False
self.api = api
self.get_state = get_state
self.set_state = set_state
self.exit_event = threading.Event()
self.metric = metric

def ping(self) -> None:
while not self.exit_event.is_set():
Expand All @@ -366,8 +371,10 @@ def ping(self) -> None:
if self.in_error:
self.in_error = False
LOGGER.error("API Ping back to normal")
self.metric.inc("ping_api_count")
except Exception: # pylint: disable=broad-except
self.in_error = True
self.metric.inc("ping_api_error")
LOGGER.error("Error pinging the API")
self.exit_event.wait(40)

Expand Down Expand Up @@ -648,10 +655,19 @@ def __init__(self, config: Dict) -> None:
self.connect_validate_before_import = get_config_variable(
"CONNECTOR_VALIDATE_BEFORE_IMPORT",
["connector", "validate_before_import"],
)
# Start up the server to expose the metrics.
expose_metrics = get_config_variable(
"CONNECTOR_EXPOSE_METRICS",
["connector", "expose_metrics"],
config,
False,
False,
)
metrics_port = get_config_variable(
"CONNECTOR_METRICS_PORT", ["connector", "metrics_port"], config, True, 9095
)
self.metric = OpenCTIMetricHandler(expose_metrics, metrics_port)

# Configure logger
logging.basicConfig(level=self.log_level)
Expand Down Expand Up @@ -693,7 +709,7 @@ def __init__(self, config: Dict) -> None:
# Start ping thread
if not self.connect_run_and_terminate:
self.ping = PingAlive(
self.connector.id, self.api, self.get_state, self.set_state
self.connector.id, self.api, self.get_state, self.set_state, self.metric
)
self.ping.start()

Expand Down Expand Up @@ -787,6 +803,7 @@ def force_ping(self):
if initial_state != remote_state:
self.api.connector.ping(self.connector_id, initial_state)
except Exception: # pylint: disable=broad-except
self.metric.inc("error_count")
LOGGER.error("Error pinging the API")

def listen(self, message_callback: Callable[[Dict], str]) -> None:
Expand Down Expand Up @@ -974,6 +991,7 @@ def send_stix2_bundle(self, bundle, **kwargs) -> list:
bundles = stix2_splitter.split_bundle(bundle, True, event_version)

if len(bundles) == 0:
self.metric.inc("error_count")
raise ValueError("Nothing to import")

if work_id:
Expand Down Expand Up @@ -1060,8 +1078,11 @@ def _send_bundle(self, channel, bundle, **kwargs) -> None:
delivery_mode=2, content_encoding="utf-8" # make message persistent
),
)
logging.info("Bundle has been sent")
self.metric.inc("bundle_send")
except (UnroutableError, NackError) as e:
LOGGER.error("Unable to send bundle, retry...%s", e)
self.metric.inc("error_count")
self._send_bundle(channel, bundle, **kwargs)

def stix2_get_embedded_objects(self, item) -> Dict:
Expand Down
115 changes: 115 additions & 0 deletions pycti/connector/opencti_metric_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from typing import Type, Union

from prometheus_client import Counter, Enum, start_http_server


class OpenCTIMetricHandler:
def __init__(self, activated: bool = False, port: int = 9095):
"""
Init of OpenCTIMetricHandler class.
Parameters
----------
activated : bool, default False
If True use metrics in client and connectors.
port : int, default 9095
Port for prometheus server.
"""
self.activated = activated

if self.activated:
logging.info(f"Exposing metrics on port {port}")
start_http_server(port)

self._metrics = {
"bundle_send": Counter(
"bundle_send",
"Number of bundle send",
),
"record_send": Counter(
"record_send",
"Number of record (objects per bundle) send",
),
"run_count": Counter(
"run_count",
"Number of run",
),
"ping_api_count": Counter(
"ping_api_count",
"Number of ping to the api",
),
"ping_api_error": Counter(
"ping_api_error",
"Number of error when pinging the api",
),
"error_count": Counter(
"error_count",
"Number of error",
),
"client_error_count": Counter(
"client_error_count",
"Number of client error",
),
"state": Enum(
"state", "State of connector", states=["idle", "running", "stopped"]
),
}

def _metric_exists(
self, name: str, expected_type: Union[Type[Counter], Type[Enum]]
) -> bool:
"""
Check if a metric exists and has the correct type.
If it does not, log an error and return False.
Parameters
----------
name : str
Name of the metric to check.
expected_type : Counter or Enum
Expected type of the metric.
Returns
-------
bool
True if the metric exists and is of the correct type else False.
"""
if name not in self._metrics:
logging.error(f"Metric {name} does not exist.")
return False
if not isinstance(self._metrics[name], expected_type):
logging.error(f"Metric {name} is not of expected type {expected_type}.")
return False
return True

def inc(self, name: str, n: int = 1):
"""
Increment the metric (counter) `name` by `n`.
Parameters
----------
name : str
Name of the metric to increment.
n : int, default 1
Increment the counter by `n`.
"""
if self.activated:
if self._metric_exists(name, Counter):
self._metrics[name].inc(n)

def state(self, state: str, name: str = "state"):
"""
Set the state `state` for metric `name`.
Parameters
----------
state : str
State to set.
name : str, default = "state"
Name of the metric to set.
"""
if self.activated:
if self._metric_exists(name, Enum):
self._metrics[name].state(state)
11 changes: 8 additions & 3 deletions pycti/entities/opencti_external_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,13 @@ def add_file(self, **kwargs):
file_name = kwargs.get("file_name", None)
data = kwargs.get("data", None)
mime_type = kwargs.get("mime_type", "text/plain")
no_trigger_import = kwargs.get("no_trigger_import", False)
if id is not None and file_name is not None:
final_file_name = os.path.basename(file_name)
query = """
mutation ExternalReferenceEdit($id: ID!, $file: Upload!) {
mutation ExternalReferenceEdit($id: ID!, $file: Upload!, $noTriggerImport: Boolean) {
externalReferenceEdit(id: $id) {
importPush(file: $file) {
importPush(file: $file, noTriggerImport: $noTriggerImport) {
id
name
}
Expand All @@ -268,7 +269,11 @@ def add_file(self, **kwargs):
)
return self.opencti.query(
query,
{"id": id, "file": (self.file(final_file_name, data, mime_type))},
{
"id": id,
"file": (self.file(final_file_name, data, mime_type)),
"noTriggerImport": no_trigger_import,
},
)
else:
LOGGER.error(
Expand Down
11 changes: 8 additions & 3 deletions pycti/entities/opencti_stix_cyber_observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,13 @@ def add_file(self, **kwargs):
file_name = kwargs.get("file_name", None)
data = kwargs.get("data", None)
mime_type = kwargs.get("mime_type", "text/plain")
no_trigger_import = kwargs.get("no_trigger_import", False)
if id is not None and file_name is not None:
final_file_name = os.path.basename(file_name)
query = """
mutation StixCyberObservableEdit($id: ID!, $file: Upload!) {
mutation StixCyberObservableEdit($id: ID!, $file: Upload!, $noTriggerImport: Boolean) {
stixCyberObservableEdit(id: $id) {
importPush(file: $file) {
importPush(file: $file, noTriggerImport: $noTriggerImport) {
id
name
}
Expand All @@ -504,7 +505,11 @@ def add_file(self, **kwargs):
)
return self.opencti.query(
query,
{"id": id, "file": (self.file(final_file_name, data, mime_type))},
{
"id": id,
"file": (self.file(final_file_name, data, mime_type)),
"noTriggerImport": no_trigger_import,
},
)
else:
LOGGER.error(
Expand Down
11 changes: 8 additions & 3 deletions pycti/entities/opencti_stix_domain_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,12 +802,13 @@ def add_file(self, **kwargs):
file_name = kwargs.get("file_name", None)
data = kwargs.get("data", None)
mime_type = kwargs.get("mime_type", "text/plain")
no_trigger_import = kwargs.get("no_trigger_import", False)
if id is not None and file_name is not None:
final_file_name = os.path.basename(file_name)
query = """
mutation StixDomainObjectEdit($id: ID!, $file: Upload!) {
mutation StixDomainObjectEdit($id: ID!, $file: Upload!, $noTriggerImport: Boolean) {
stixDomainObjectEdit(id: $id) {
importPush(file: $file) {
importPush(file: $file, noTriggerImport: $noTriggerImport) {
id
name
}
Expand All @@ -825,7 +826,11 @@ def add_file(self, **kwargs):
)
return self.opencti.query(
query,
{"id": id, "file": (self.file(final_file_name, data, mime_type))},
{
"id": id,
"file": (self.file(final_file_name, data, mime_type)),
"noTriggerImport": no_trigger_import,
},
)
else:
LOGGER.error(
Expand Down
Loading

0 comments on commit cf6b01d

Please sign in to comment.