Skip to content

Commit

Permalink
[client] add init of prometheus and OpenCTIMetricHandler (#455)
Browse files Browse the repository at this point in the history
  • Loading branch information
axelfahy authored Sep 19, 2023
1 parent feb88a0 commit d87077d
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pycti/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
OpenCTIConnectorHelper,
get_config_variable,
)
from .connector.opencti_metric_handler import OpenCTIMetricHandler
from .entities.opencti_attack_pattern import AttackPattern
from .entities.opencti_campaign import Campaign
from .entities.opencti_case_incident import CaseIncident
Expand Down Expand Up @@ -97,6 +98,7 @@
"OpenCTIApiWork",
"OpenCTIConnector",
"OpenCTIConnectorHelper",
"OpenCTIMetricHandler",
"OpenCTIStix2",
"OpenCTIStix2Splitter",
"OpenCTIStix2Update",
Expand Down
25 changes: 23 additions & 2 deletions pycti/connector/opencti_connector_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pycti.api.opencti_api_client import OpenCTIApiClient
from pycti.connector import LOGGER
from pycti.connector.opencti_connector import OpenCTIConnector
from pycti.connector.opencti_metric_handler import OpenCTIMetricHandler
from pycti.utils.opencti_stix2_splitter import OpenCTIStix2Splitter

TRUTHY: List[str] = ["yes", "true", "True"]
Expand Down Expand Up @@ -56,6 +57,7 @@ def get_config_variable(
:param yaml_path: path to yaml config
:param config: client config dict, defaults to {}
:param isNumber: specify if the variable is a number, defaults to False
:param default: default value
"""

if os.getenv(env_var) is not None:
Expand Down Expand Up @@ -267,10 +269,12 @@ def _data_handler(self, json_data) -> None:
)
return message
except Exception as e: # pylint: disable=broad-except
self.helper.metric.inc("error_count")
LOGGER.exception("Error in message processing, reporting error to API")
try:
self.helper.api.work.to_processed(work_id, str(e), True)
except: # pylint: disable=bare-except
self.helper.metric.inc("error_count")
LOGGER.error("Failing reporting the processing")

def run(self) -> None:
Expand Down Expand Up @@ -337,14 +341,15 @@ def on_channel_open(self, channel):


class PingAlive(threading.Thread):
def __init__(self, connector_id, api, get_state, set_state) -> None:
def __init__(self, connector_id, api, get_state, set_state, metric) -> None:
threading.Thread.__init__(self)
self.connector_id = connector_id
self.in_error = False
self.api = api
self.get_state = get_state
self.set_state = set_state
self.exit_event = threading.Event()
self.metric = metric

def ping(self) -> None:
while not self.exit_event.is_set():
Expand All @@ -366,8 +371,10 @@ def ping(self) -> None:
if self.in_error:
self.in_error = False
LOGGER.error("API Ping back to normal")
self.metric.inc("ping_api_count")
except Exception: # pylint: disable=broad-except
self.in_error = True
self.metric.inc("ping_api_error")
LOGGER.error("Error pinging the API")
self.exit_event.wait(40)

Expand Down Expand Up @@ -648,10 +655,19 @@ def __init__(self, config: Dict) -> None:
self.connect_validate_before_import = get_config_variable(
"CONNECTOR_VALIDATE_BEFORE_IMPORT",
["connector", "validate_before_import"],
)
# Start up the server to expose the metrics.
expose_metrics = get_config_variable(
"CONNECTOR_EXPOSE_METRICS",
["connector", "expose_metrics"],
config,
False,
False,
)
metrics_port = get_config_variable(
"CONNECTOR_METRICS_PORT", ["connector", "metrics_port"], config, True, 9095
)
self.metric = OpenCTIMetricHandler(expose_metrics, metrics_port)

# Configure logger
logging.basicConfig(level=self.log_level)
Expand Down Expand Up @@ -693,7 +709,7 @@ def __init__(self, config: Dict) -> None:
# Start ping thread
if not self.connect_run_and_terminate:
self.ping = PingAlive(
self.connector.id, self.api, self.get_state, self.set_state
self.connector.id, self.api, self.get_state, self.set_state, self.metric
)
self.ping.start()

Expand Down Expand Up @@ -787,6 +803,7 @@ def force_ping(self):
if initial_state != remote_state:
self.api.connector.ping(self.connector_id, initial_state)
except Exception: # pylint: disable=broad-except
self.metric.inc("error_count")
LOGGER.error("Error pinging the API")

def listen(self, message_callback: Callable[[Dict], str]) -> None:
Expand Down Expand Up @@ -974,6 +991,7 @@ def send_stix2_bundle(self, bundle, **kwargs) -> list:
bundles = stix2_splitter.split_bundle(bundle, True, event_version)

if len(bundles) == 0:
self.metric.inc("error_count")
raise ValueError("Nothing to import")

if work_id:
Expand Down Expand Up @@ -1060,8 +1078,11 @@ def _send_bundle(self, channel, bundle, **kwargs) -> None:
delivery_mode=2, content_encoding="utf-8" # make message persistent
),
)
logging.info("Bundle has been sent")
self.metric.inc("bundle_send")
except (UnroutableError, NackError) as e:
LOGGER.error("Unable to send bundle, retry...%s", e)
self.metric.inc("error_count")
self._send_bundle(channel, bundle, **kwargs)

def stix2_get_embedded_objects(self, item) -> Dict:
Expand Down
115 changes: 115 additions & 0 deletions pycti/connector/opencti_metric_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from typing import Type, Union

from prometheus_client import Counter, Enum, start_http_server


class OpenCTIMetricHandler:
def __init__(self, activated: bool = False, port: int = 9095):
"""
Init of OpenCTIMetricHandler class.
Parameters
----------
activated : bool, default False
If True use metrics in client and connectors.
port : int, default 9095
Port for prometheus server.
"""
self.activated = activated

if self.activated:
logging.info(f"Exposing metrics on port {port}")
start_http_server(port)

self._metrics = {
"bundle_send": Counter(
"bundle_send",
"Number of bundle send",
),
"record_send": Counter(
"record_send",
"Number of record (objects per bundle) send",
),
"run_count": Counter(
"run_count",
"Number of run",
),
"ping_api_count": Counter(
"ping_api_count",
"Number of ping to the api",
),
"ping_api_error": Counter(
"ping_api_error",
"Number of error when pinging the api",
),
"error_count": Counter(
"error_count",
"Number of error",
),
"client_error_count": Counter(
"client_error_count",
"Number of client error",
),
"state": Enum(
"state", "State of connector", states=["idle", "running", "stopped"]
),
}

def _metric_exists(
self, name: str, expected_type: Union[Type[Counter], Type[Enum]]
) -> bool:
"""
Check if a metric exists and has the correct type.
If it does not, log an error and return False.
Parameters
----------
name : str
Name of the metric to check.
expected_type : Counter or Enum
Expected type of the metric.
Returns
-------
bool
True if the metric exists and is of the correct type else False.
"""
if name not in self._metrics:
logging.error(f"Metric {name} does not exist.")
return False
if not isinstance(self._metrics[name], expected_type):
logging.error(f"Metric {name} is not of expected type {expected_type}.")
return False
return True

def inc(self, name: str, n: int = 1):
"""
Increment the metric (counter) `name` by `n`.
Parameters
----------
name : str
Name of the metric to increment.
n : int, default 1
Increment the counter by `n`.
"""
if self.activated:
if self._metric_exists(name, Counter):
self._metrics[name].inc(n)

def state(self, state: str, name: str = "state"):
"""
Set the state `state` for metric `name`.
Parameters
----------
state : str
State to set.
name : str, default = "state"
Name of the metric to set.
"""
if self.activated:
if self._metric_exists(name, Enum):
self._metrics[name].state(state)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ requests~=2.31.0
setuptools~=68.2.1
filigran-sseclient~=1.0.0
stix2~=3.0.1
cachetools~=5.3.0
cachetools~=5.3.0
prometheus-client~=0.13.1
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ install_requires =
datefinder~=0.7.3
pika~=1.3.1
python-magic~=0.4.27; sys_platform == "linux" or sys_platform == "darwin"
prometheus-client~=0.13.1
python-magic-bin~=0.4.14; sys_platform == "win32"
python_json_logger~=2.0.4
pyyaml~=6.0
Expand Down
Empty file.
13 changes: 13 additions & 0 deletions tests/01-unit/metric/test_opencti_metric_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from unittest import TestCase

from prometheus_client import Counter, Enum

from pycti import OpenCTIMetricHandler


class TestOpenCTIMetricHandler(TestCase):
def test_metric_exists(self):
metric = OpenCTIMetricHandler(activated=True)
self.assertTrue(metric._metric_exists("error_count", Counter))
self.assertFalse(metric._metric_exists("error_count", Enum))
self.assertFalse(metric._metric_exists("best_metric_count", Counter))
2 changes: 2 additions & 0 deletions tests/cases/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def __init__(self, config_file_path: str, api_client: OpenCTIApiClient, data: Di
os.environ["OPENCTI_TOKEN"] = api_client.api_token
os.environ["OPENCTI_SSL_VERIFY"] = str(api_client.ssl_verify)
os.environ["OPENCTI_JSON_LOGGING"] = "true"
os.environ["CONNECTOR_EXPOSE_METRICS"] = "true"
os.environ["CONNECTOR_METRICS_PORT"] = "9096"

config = (
yaml.load(open(config_file_path), Loader=yaml.FullLoader)
Expand Down

0 comments on commit d87077d

Please sign in to comment.