From 47cb3ecda27990524ade224aba3134edf49e4b2c Mon Sep 17 00:00:00 2001 From: mbonniot Date: Thu, 20 Feb 2025 17:14:17 +0100 Subject: [PATCH 01/11] [API_PARSER][CROWDSTRIKE] Fix offset the case of gathering more than 100 logs in one request --- .../api_parser/crowdstrike/crowdstrike.py | 125 +++++++++--------- 1 file changed, 63 insertions(+), 62 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 92ebe46b..00a78ca0 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -56,6 +56,7 @@ class CrowdstrikeParser(ApiParser): 'accept': 'application/json' } + def __init__(self, data): super().__init__(data) @@ -72,7 +73,6 @@ def __init__(self, data): if not self.api_host.startswith('https://'): self.api_host = f"https://{self.api_host}" - self.login() def login(self): logger.info(f"[{__parser__}][login]: Login in...", extra={'frontend': str(self.frontend)}) @@ -81,12 +81,14 @@ def login(self): self.session = requests.session() self.session.headers.update(self.HEADERS) - payload = {'client_id': self.client_id, - 'client_secret': self.client_secret} try: + # Rate limiting seems to be 10/minute over the auth_url endpoint response = self.session.post( auth_url, - data=payload, + data={ + 'client_id': self.client_id, + 'client_secret': self.client_secret + }, timeout=10, proxies=self.proxies, verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl @@ -114,6 +116,7 @@ def login(self): return True, self.session + def __execute_query(self, method, url, query, timeout=10): retry = 3 while(retry > 0): @@ -149,6 +152,7 @@ def __execute_query(self, method, url, query, timeout=10): return {} return response.json() + def unionDict(self, dictBase, dictToAdd): finalDict = {} for k, v in dictBase.items(): @@ -161,36 +165,36 @@ def unionDict(self, dictBase, dictToAdd): return finalDict def execute_query(self, method, url, query={}, timeout=10): - # can set a custom limit of entry we want to retrieve + # can set a custom limit of entry we want to retrieve - + # by default to -1 ensuring line 178 to not break on first page + # by default the result per page is 100 for this API customLimit = int(query.get('limit', -1)) jsonResp = self.__execute_query(method, url, query, timeout=timeout) totalToRetrieve = jsonResp.get('meta', {}).get('pagination', {}).get('total', 0) while(totalToRetrieve > 0 and totalToRetrieve != len(jsonResp.get('resources', []))): - # we retrieved enough data - if(customLimit > 0 and customLimit <= len(jsonResp['resources'])): - break - query['offset'] = int(jsonResp['meta']['pagination']['offset']) + # if we've retrieved enough data -> break + if(customLimit > 0 and customLimit <= len(jsonResp['resources'])): break + + query['offset'] = len(jsonResp.get('resources', [])) jsonAdditionalResp = self.__execute_query( method, url, query, timeout=timeout) jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) - #jsonResp += [jsonAdditionalResp] return jsonResp + def getSummary(self): nbSensor = self.getSensorsTotal() - version, updated = self.getApplianceVersion() + version, updated = 0, 0 # This is the appliance version return nbSensor, version, updated - def getApplianceVersion(self): - return 0, 0 - def getSensorsTotal(self): device_url = f"{self.api_host}/{self.DEVICE_URI}" ret = self.execute_query('GET', device_url, {'limit': 1}) return int(ret['meta']['pagination']['total']) + def getAlerts(self, since, to): ''' we retrieve raw incidents and detections @@ -198,7 +202,7 @@ def getAlerts(self, since, to): logger.debug(f"[{__parser__}][getAlerts]: From {since} until {to}", extra={'frontend': str(self.frontend)}) finalRawAlerts = [] - # first retrieve the detection raw ids + # first retrieve the detections raw ids alert_url = f"{self.api_host}/{self.DETECTION_URI}" payload = { "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", @@ -206,32 +210,32 @@ def getAlerts(self, since, to): } ret = self.execute_query("GET", alert_url, payload) ids = ret['resources'] - if(len(ids) > 0): - # retrieve the content of detection selected - alert_url = f"{self.api_host}/{self.DETECTION_DETAILS_URI}" - payload = {"ids": ids} - ret = self.execute_query("POST", alert_url, payload) + # then retrieve the content of selected detections ids + if(len(ids) > 0): + ret = self.execute_query(method="POST", + url=f"{self.api_host}/{self.DETECTION_DETAILS_URI}", + query={"ids": ids}) alerts = ret['resources'] for alert in alerts: finalRawAlerts += [alert] + # finally, if "request incidents" checkbox is set in GUI, + # retrieve also the incident raw ids if self.request_incidents: - # then retrieve the incident raw ids alert_url = f"{self.api_host}/{self.INCIDENT_URI}" payload = { "filter": f"start:>'{since}'+start:<='{to}'", "sort": "end|desc" } - ret = self.execute_query("GET", alert_url, payload) ids = ret['resources'] if(len(ids) > 0): - # retrieve the content of selected incidents - alert_url = f"{self.api_host}/{self.INCIDENT_DETAILS_URI}" - payload = {"ids": ids} - ret = self.execute_query("POST", alert_url, payload) + # then retrieve the content of selected incidents ids + ret = self.execute_query(method="POST", + url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", + query={"ids": ids}) alerts = ret['resources'] for alert in alerts: finalRawAlerts += [alert] @@ -253,45 +257,10 @@ def format_log(self, log): log['url'] = self.api_host return json.dumps(log) - def execute(self): - # Retrieve version of cybereason console - _, self.observer_version, _ = self.getSummary() - - self.kind = "details" - # Default timestamp is 24 hours ago - since = self.last_api_call or (timezone.now() - datetime.timedelta(days=7)) - # Get a batch of 24h at most, to avoid running the parser for too long - # delay the query time of 2 minutes, to avoid missing events - to = min(timezone.now()-timedelta(minutes=2), since + timedelta(hours=24)) - to = to.strftime("%Y-%m-%dT%H:%M:%SZ") - since = since.strftime("%Y-%m-%dT%H:%M:%SZ") - tmp_logs = self.get_logs(self.kind, since=since, to=to) - - # Downloading may take some while, so refresh token in Redis - self.update_lock() - - total = len(tmp_logs) - - if total > 0: - logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) - - # Logs sorted by timestamp descending, so first is newer - self.frontend.last_api_call = to - - elif self.last_api_call < timezone.now()-timedelta(hours=24): - # If no logs where retrieved during the last 24hours, - # move forward 1h to prevent stagnate ad vitam eternam - self.frontend.last_api_call += timedelta(hours=1) - - self.write_to_file([self.format_log(log) for log in tmp_logs]) - - # Writting may take some while, so refresh token in Redis - self.update_lock() - - logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)}) def test(self): try: + self.login() # establish a session to console logger.debug(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) query_time = (timezone.now() - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%SZ") @@ -311,3 +280,35 @@ def test(self): "status": False, "error": str(e) } + + + def execute(self): + self.login() # establish a session to console + _, self.observer_version, _ = self.getSummary() # Retrieve version of crowdstrike console + self.kind = "details" + + # Default timestamp is 24 hours ago + since = self.last_api_call or (timezone.now() - datetime.timedelta(days=7)) + # Get a batch of 24h at most, to avoid running queries for too long + # also delay the query time of 3 minutes, to avoid missing events + to = min(timezone.now()-timedelta(minutes=3), since + timedelta(hours=24)) + to = to.strftime("%Y-%m-%dT%H:%M:%SZ") + since = since.strftime("%Y-%m-%dT%H:%M:%SZ") + + tmp_logs = self.get_logs(self.kind, since=since, to=to) + self.update_lock() + + total = len(tmp_logs) + if total > 0: + logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) + self.frontend.last_api_call = to # Logs sorted by timestamp descending, so first is newer + + # If no logs where retrieved during the last 24hours, + # move forward 1h to prevent stagnate ad vitam eternam + elif self.last_api_call < timezone.now()-timedelta(hours=24): + self.frontend.last_api_call += timedelta(hours=1) + + self.write_to_file([self.format_log(log) for log in tmp_logs]) + self.update_lock() + + logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)}) From 790aa3a76c57f0e05f8d4e8a1ccd104282d34143 Mon Sep 17 00:00:00 2001 From: mbonniot Date: Thu, 20 Feb 2025 18:14:00 +0100 Subject: [PATCH 02/11] [CROWDSTRIKE] Continue to clean code the code --- .../api_parser/crowdstrike/crowdstrike.py | 132 ++++++++---------- 1 file changed, 62 insertions(+), 70 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 00a78ca0..41ea180d 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -56,7 +56,6 @@ class CrowdstrikeParser(ApiParser): 'accept': 'application/json' } - def __init__(self, data): super().__init__(data) @@ -65,11 +64,10 @@ def __init__(self, data): self.client_secret = data["crowdstrike_client_secret"] self.client = data["crowdstrike_client"] - self.product = 'crowdstrike' - self.session = None - self.request_incidents = data.get('crowdstrike_request_incidents', False) + self.session = None + if not self.api_host.startswith('https://'): self.api_host = f"https://{self.api_host}" @@ -99,13 +97,13 @@ def login(self): return False, ('Connection failed') except requests.exceptions.ReadTimeout: self.session = None - logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (read_timeout)', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (ReadTimeout)', extra={'frontend': str(self.frontend)}) return False, ('Connection failed') response.raise_for_status() if response.status_code not in [200, 201]: self.session = None - logger.error(f'[{__parser__}][login]: Authentication failed. code {response.status_code}', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Authentication failed. Status code {response.status_code}', extra={'frontend': str(self.frontend)}) return False, ('Authentication failed') ret = response.json() @@ -131,11 +129,10 @@ def __execute_query(self, method, url, query, timeout=10): verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl ) elif(method == "POST"): - headers = {'Content-Type': 'application/json'} response = self.session.post( url, data=json.dumps(query), - headers=headers, + headers={'Content-Type': 'application/json'}, timeout=timeout, proxies=self.proxies, verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl @@ -165,9 +162,8 @@ def unionDict(self, dictBase, dictToAdd): return finalDict def execute_query(self, method, url, query={}, timeout=10): - # can set a custom limit of entry we want to retrieve - - # by default to -1 ensuring line 178 to not break on first page - # by default the result per page is 100 for this API + # Setting a custom limit to -1 by default ensuring line 178 to not break on first page collection. + # The result per page is 100 by default for this API (when not using limit as it is the case here). customLimit = int(query.get('limit', -1)) jsonResp = self.__execute_query(method, url, query, timeout=timeout) @@ -178,37 +174,24 @@ def execute_query(self, method, url, query={}, timeout=10): if(customLimit > 0 and customLimit <= len(jsonResp['resources'])): break query['offset'] = len(jsonResp.get('resources', [])) - jsonAdditionalResp = self.__execute_query( - method, url, query, timeout=timeout) + jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout) + jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) return jsonResp - def getSummary(self): - nbSensor = self.getSensorsTotal() - version, updated = 0, 0 # This is the appliance version - return nbSensor, version, updated + def get_detections(self, since, to): + logger.debug(f"[{__parser__}][get_detections]: From {since} until {to}", extra={'frontend': str(self.frontend)}) - def getSensorsTotal(self): - device_url = f"{self.api_host}/{self.DEVICE_URI}" - ret = self.execute_query('GET', device_url, {'limit': 1}) - return int(ret['meta']['pagination']['total']) - - - def getAlerts(self, since, to): - ''' - we retrieve raw incidents and detections - ''' - logger.debug(f"[{__parser__}][getAlerts]: From {since} until {to}", extra={'frontend': str(self.frontend)}) - - finalRawAlerts = [] + detections = [] # first retrieve the detections raw ids - alert_url = f"{self.api_host}/{self.DETECTION_URI}" payload = { "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", "sort": "last_behavior|desc" } - ret = self.execute_query("GET", alert_url, payload) + ret = self.execute_query("GET", + f"{self.api_host}/{self.DETECTION_URI}", + payload) ids = ret['resources'] # then retrieve the content of selected detections ids @@ -218,43 +201,54 @@ def getAlerts(self, since, to): query={"ids": ids}) alerts = ret['resources'] for alert in alerts: - finalRawAlerts += [alert] - - # finally, if "request incidents" checkbox is set in GUI, - # retrieve also the incident raw ids - if self.request_incidents: - alert_url = f"{self.api_host}/{self.INCIDENT_URI}" - payload = { - "filter": f"start:>'{since}'+start:<='{to}'", - "sort": "end|desc" - } - ret = self.execute_query("GET", alert_url, payload) - ids = ret['resources'] - - if(len(ids) > 0): - # then retrieve the content of selected incidents ids - ret = self.execute_query(method="POST", - url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", - query={"ids": ids}) - alerts = ret['resources'] - for alert in alerts: - finalRawAlerts += [alert] - return finalRawAlerts - - def get_logs(self, kind, since, to): - msg = f"Querying {kind} from {since}, to {to}" - logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) + detections += [alert] + return detections + + def get_incidents(self, since, to): + logger.debug(f"[{__parser__}][get_incidents]: From {since} until {to}", extra={'frontend': str(self.frontend)}) + incidents = [] + # first retrieve the incidents raw ids + payload = { + "filter": f"start:>'{since}'+start:<='{to}'", + "sort": "end|desc" + } + ret = self.execute_query("GET", + f"{self.api_host}/{self.INCIDENT_URI}", + payload) + ids = ret['resources'] + + if(len(ids) > 0): + # then retrieve the content of selected incidents ids + ret = self.execute_query(method="POST", + url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", + query={"ids": ids}) + alerts = ret['resources'] + for alert in alerts: + incidents += [alert] + return incidents + + def get_logs(self, since, to): try: - return self.getAlerts(since, to) + kind = "detections" + msg = f"Querying {kind} from {since}, to {to}" + logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) + detections = self.get_detections(since, to) + + kind = "incidents" + msg = f"Querying {kind} from {since}, to {to}" + logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) + if self.request_incidents: + incidents = self.get_incidents(since, to) + except Exception as e: logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)}) raise Exception(f"Error querying {kind} logs") + return detections + incidents + def format_log(self, log): - log['kind'] = self.kind - log['observer_version'] = self.observer_version - log['url'] = self.api_host + log['url'] = self.api_host # This static field is mandatory for parser return json.dumps(log) @@ -263,12 +257,12 @@ def test(self): self.login() # establish a session to console logger.debug(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) - query_time = (timezone.now() - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%SZ") + since = (timezone.now() - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%SZ") to = timezone.now().strftime("%Y-%m-%dT%H:%M:%SZ") - logs = self.get_logs("details", query_time, to) + logs = self.get_logs(since, to) - msg = f"{len(logs)} details retrieved" + msg = f"{len(logs)} logs retrieved" logger.info(f"[{__parser__}][test]: {msg}", extra={'frontend': str(self.frontend)}) return { "status": True, @@ -284,8 +278,6 @@ def test(self): def execute(self): self.login() # establish a session to console - _, self.observer_version, _ = self.getSummary() # Retrieve version of crowdstrike console - self.kind = "details" # Default timestamp is 24 hours ago since = self.last_api_call or (timezone.now() - datetime.timedelta(days=7)) @@ -295,10 +287,10 @@ def execute(self): to = to.strftime("%Y-%m-%dT%H:%M:%SZ") since = since.strftime("%Y-%m-%dT%H:%M:%SZ") - tmp_logs = self.get_logs(self.kind, since=since, to=to) + logs = self.get_logs(since=since, to=to) self.update_lock() - total = len(tmp_logs) + total = len(logs) if total > 0: logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) self.frontend.last_api_call = to # Logs sorted by timestamp descending, so first is newer @@ -308,7 +300,7 @@ def execute(self): elif self.last_api_call < timezone.now()-timedelta(hours=24): self.frontend.last_api_call += timedelta(hours=1) - self.write_to_file([self.format_log(log) for log in tmp_logs]) + self.write_to_file([self.format_log(log) for log in logs]) self.update_lock() logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)}) From 9efce88883e0f22a4c7f951b51147e8b24a1b863 Mon Sep 17 00:00:00 2001 From: mbonniot Date: Fri, 21 Feb 2025 11:05:52 +0100 Subject: [PATCH 03/11] [CROWDSTRIKE] Fix none incidents object (if option not set) + clean code --- .../api_parser/crowdstrike/crowdstrike.py | 49 +++++++++---------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 41ea180d..dc276c32 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -74,15 +74,14 @@ def __init__(self, data): def login(self): logger.info(f"[{__parser__}][login]: Login in...", extra={'frontend': str(self.frontend)}) - auth_url = f"{self.api_host}/{self.AUTH_URI}" self.session = requests.session() self.session.headers.update(self.HEADERS) try: - # Rate limiting seems to be 10/minute over the auth_url endpoint + # Rate limiting seems to be 10/minute for the authentication endpoint response = self.session.post( - auth_url, + url=f"{self.api_host}/{self.AUTH_URI}", data={ 'client_id': self.client_id, 'client_secret': self.client_secret @@ -97,7 +96,7 @@ def login(self): return False, ('Connection failed') except requests.exceptions.ReadTimeout: self.session = None - logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (ReadTimeout)', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Connection failed (ReadTimeout)', extra={'frontend': str(self.frontend)}) return False, ('Connection failed') response.raise_for_status() @@ -143,9 +142,8 @@ def __execute_query(self, method, url, query, timeout=10): break # no error we break from the loop if response.status_code not in [200, 201]: - logger.error( - f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}", extra={'frontend': str(self.frontend)} - ) + logger.error(f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}", + extra={'frontend': str(self.frontend)}) return {} return response.json() @@ -193,7 +191,6 @@ def get_detections(self, since, to): f"{self.api_host}/{self.DETECTION_URI}", payload) ids = ret['resources'] - # then retrieve the content of selected detections ids if(len(ids) > 0): ret = self.execute_query(method="POST", @@ -217,9 +214,8 @@ def get_incidents(self, since, to): f"{self.api_host}/{self.INCIDENT_URI}", payload) ids = ret['resources'] - + # then retrieve the content of selected incidents ids if(len(ids) > 0): - # then retrieve the content of selected incidents ids ret = self.execute_query(method="POST", url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", query={"ids": ids}) @@ -229,23 +225,22 @@ def get_incidents(self, since, to): return incidents def get_logs(self, since, to): - try: - kind = "detections" - msg = f"Querying {kind} from {since}, to {to}" - logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) - detections = self.get_detections(since, to) + logs = [] - kind = "incidents" - msg = f"Querying {kind} from {since}, to {to}" - logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) - if self.request_incidents: - incidents = self.get_incidents(since, to) - - except Exception as e: - logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)}) - raise Exception(f"Error querying {kind} logs") + kinds = ["detections"] + if self.request_incidents: + kinds.append("incidents") + for kind in kinds: + try: + msg = f"Querying {kind} from {since}, to {to}" + logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) + get_func_type = getattr(self, f"get_{kind}") + logs.extend(get_func_type(since, to)) - return detections + incidents + except Exception as e: + logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)}) + raise Exception(f"Error querying {kind} logs") + return logs def format_log(self, log): log['url'] = self.api_host # This static field is mandatory for parser @@ -255,9 +250,9 @@ def format_log(self, log): def test(self): try: self.login() # establish a session to console - logger.debug(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) + logger.info(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) - since = (timezone.now() - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%SZ") + since = (timezone.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") to = timezone.now().strftime("%Y-%m-%dT%H:%M:%SZ") logs = self.get_logs(since, to) From 6472412138a610d25c9974957b534c27bbb68d6b Mon Sep 17 00:00:00 2001 From: mbonniot Date: Fri, 21 Feb 2025 12:04:00 +0100 Subject: [PATCH 04/11] [CROWDSTRIKE] Re-arrange update_lock to avoid completely multiple executions in case of infinite loop --- .../api_parser/crowdstrike/crowdstrike.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index dc276c32..90e79240 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -164,15 +164,17 @@ def execute_query(self, method, url, query={}, timeout=10): # The result per page is 100 by default for this API (when not using limit as it is the case here). customLimit = int(query.get('limit', -1)) + # Get first page of logs jsonResp = self.__execute_query(method, url, query, timeout=timeout) totalToRetrieve = jsonResp.get('meta', {}).get('pagination', {}).get('total', 0) - + # Continue to paginate if more than customLimit number of logs in totalToRetrieve while(totalToRetrieve > 0 and totalToRetrieve != len(jsonResp.get('resources', []))): # if we've retrieved enough data -> break if(customLimit > 0 and customLimit <= len(jsonResp['resources'])): break query['offset'] = len(jsonResp.get('resources', [])) jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout) + self.update_lock() jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) return jsonResp @@ -187,18 +189,17 @@ def get_detections(self, since, to): "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", "sort": "last_behavior|desc" } - ret = self.execute_query("GET", - f"{self.api_host}/{self.DETECTION_URI}", - payload) + ret = self.execute_query(method="GET", + url=f"{self.api_host}/{self.DETECTION_URI}", + query=payload) ids = ret['resources'] # then retrieve the content of selected detections ids if(len(ids) > 0): ret = self.execute_query(method="POST", url=f"{self.api_host}/{self.DETECTION_DETAILS_URI}", query={"ids": ids}) - alerts = ret['resources'] - for alert in alerts: - detections += [alert] + ret = ret['resources'] + for alert in ret: detections += [alert] return detections def get_incidents(self, since, to): @@ -210,32 +211,33 @@ def get_incidents(self, since, to): "filter": f"start:>'{since}'+start:<='{to}'", "sort": "end|desc" } - ret = self.execute_query("GET", - f"{self.api_host}/{self.INCIDENT_URI}", - payload) + ret = self.execute_query(method="GET", + url=f"{self.api_host}/{self.INCIDENT_URI}", + query=payload) ids = ret['resources'] # then retrieve the content of selected incidents ids if(len(ids) > 0): ret = self.execute_query(method="POST", - url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", - query={"ids": ids}) - alerts = ret['resources'] - for alert in alerts: - incidents += [alert] + url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", + query={"ids": ids}) + ret = ret['resources'] + for alert in ret: incidents += [alert] return incidents def get_logs(self, since, to): logs = [] kinds = ["detections"] - if self.request_incidents: - kinds.append("incidents") + if self.request_incidents: kinds.append("incidents") + for kind in kinds: try: msg = f"Querying {kind} from {since}, to {to}" logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) + + # get new logs get_func_type = getattr(self, f"get_{kind}") - logs.extend(get_func_type(since, to)) + logs.extend(get_func_type(since, to)) except Exception as e: logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)}) @@ -283,7 +285,6 @@ def execute(self): since = since.strftime("%Y-%m-%dT%H:%M:%SZ") logs = self.get_logs(since=since, to=to) - self.update_lock() total = len(logs) if total > 0: From 0af77d863bc42a0213af067e35e683b88261bd46 Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 26 Feb 2025 12:54:35 +0100 Subject: [PATCH 05/11] [API_PARSER][CROWDSTRIKE] Clean execute_query be removing unused/confusing customLimit & small cleans over execute, get_detections, get_incidents --- .../api_parser/crowdstrike/crowdstrike.py | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 90e79240..8af2ccb3 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -160,23 +160,22 @@ def unionDict(self, dictBase, dictToAdd): return finalDict def execute_query(self, method, url, query={}, timeout=10): - # Setting a custom limit to -1 by default ensuring line 178 to not break on first page collection. - # The result per page is 100 by default for this API (when not using limit as it is the case here). - customLimit = int(query.get('limit', -1)) + # query['limit'] = 100 # Get first page of logs - jsonResp = self.__execute_query(method, url, query, timeout=timeout) + jsonResp = self.__execute_query(method, url, query, timeout=timeout) totalToRetrieve = jsonResp.get('meta', {}).get('pagination', {}).get('total', 0) - # Continue to paginate if more than customLimit number of logs in totalToRetrieve - while(totalToRetrieve > 0 and totalToRetrieve != len(jsonResp.get('resources', []))): - # if we've retrieved enough data -> break - if(customLimit > 0 and customLimit <= len(jsonResp['resources'])): break - query['offset'] = len(jsonResp.get('resources', [])) - jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout) - self.update_lock() + if totalToRetrieve > 0: + # Continue to paginate while totalToRetrieve is different than the length of all logs gathered from successive paginations + # The default page size is 100 (when "limit" parameter is not passed to query, like the case here) + while(totalToRetrieve != len(jsonResp.get('resources', []))): + query['offset'] = len(jsonResp.get('resources', [])) - jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) + jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout) + self.update_lock() + + jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) return jsonResp @@ -185,13 +184,12 @@ def get_detections(self, since, to): detections = [] # first retrieve the detections raw ids - payload = { - "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", - "sort": "last_behavior|desc" - } ret = self.execute_query(method="GET", url=f"{self.api_host}/{self.DETECTION_URI}", - query=payload) + query={ + "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", + "sort": "last_behavior|desc" + }) ids = ret['resources'] # then retrieve the content of selected detections ids if(len(ids) > 0): @@ -207,13 +205,12 @@ def get_incidents(self, since, to): incidents = [] # first retrieve the incidents raw ids - payload = { - "filter": f"start:>'{since}'+start:<='{to}'", - "sort": "end|desc" - } ret = self.execute_query(method="GET", url=f"{self.api_host}/{self.INCIDENT_URI}", - query=payload) + query={ + "filter": f"start:>'{since}'+start:<='{to}'", + "sort": "end|desc" + }) ids = ret['resources'] # then retrieve the content of selected incidents ids if(len(ids) > 0): @@ -285,15 +282,13 @@ def execute(self): since = since.strftime("%Y-%m-%dT%H:%M:%SZ") logs = self.get_logs(since=since, to=to) - total = len(logs) if total > 0: logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) self.frontend.last_api_call = to # Logs sorted by timestamp descending, so first is newer - - # If no logs where retrieved during the last 24hours, - # move forward 1h to prevent stagnate ad vitam eternam elif self.last_api_call < timezone.now()-timedelta(hours=24): + # If no logs where retrieved during the last 24hours, + # move forward 1h to prevent stagnate ad vitam eternam self.frontend.last_api_call += timedelta(hours=1) self.write_to_file([self.format_log(log) for log in logs]) From 90651df1441fc2bf4a8c56f65b7171de70bda61b Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 26 Feb 2025 13:02:39 +0100 Subject: [PATCH 06/11] [API_PARSER][CROWDSTRIKE] Comment unusued devices endpoint --- vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 8af2ccb3..7c2c959f 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -45,7 +45,7 @@ class CrowdstrikeAPIError(Exception): class CrowdstrikeParser(ApiParser): AUTH_URI = "oauth2/token" - DEVICE_URI = "devices/queries/devices/v1" + # DEVICE_URI = "devices/queries/devices/v1" DETECTION_URI = "detects/queries/detects/v1" DETECTION_DETAILS_URI = "detects/entities/summaries/GET/v1" INCIDENT_URI = "incidents/queries/incidents/v1" From d6f00e5739033e6fcd4b67d8befa1af83293dd34 Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 26 Feb 2025 13:17:41 +0100 Subject: [PATCH 07/11] [API_PARSER][CROWDSTRIKE] Restore self.client_id in ReadTimeout logger & remove useless comment --- vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 7c2c959f..fd09b18f 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -96,7 +96,7 @@ def login(self): return False, ('Connection failed') except requests.exceptions.ReadTimeout: self.session = None - logger.error(f'[{__parser__}][login]: Connection failed (ReadTimeout)', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (ReadTimeout)', extra={'frontend': str(self.frontend)}) return False, ('Connection failed') response.raise_for_status() @@ -231,8 +231,6 @@ def get_logs(self, since, to): try: msg = f"Querying {kind} from {since}, to {to}" logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) - - # get new logs get_func_type = getattr(self, f"get_{kind}") logs.extend(get_func_type(since, to)) From 95a03bc7c27ae767d220d2b21a16b01b73118a27 Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 26 Feb 2025 14:24:54 +0100 Subject: [PATCH 08/11] [API_PARSER][CROWDSTRIKE] Restore old collector version and introduce only offset bug fix --- .../api_parser/crowdstrike/crowdstrike.py | 250 ++++++++++-------- 1 file changed, 134 insertions(+), 116 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index fd09b18f..415cce3f 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -45,7 +45,7 @@ class CrowdstrikeAPIError(Exception): class CrowdstrikeParser(ApiParser): AUTH_URI = "oauth2/token" - # DEVICE_URI = "devices/queries/devices/v1" + DEVICE_URI = "devices/queries/devices/v1" DETECTION_URI = "detects/queries/detects/v1" DETECTION_DETAILS_URI = "detects/entities/summaries/GET/v1" INCIDENT_URI = "incidents/queries/incidents/v1" @@ -64,28 +64,29 @@ def __init__(self, data): self.client_secret = data["crowdstrike_client_secret"] self.client = data["crowdstrike_client"] - self.request_incidents = data.get('crowdstrike_request_incidents', False) - + self.product = 'crowdstrike' self.session = None + self.request_incidents = data.get('crowdstrike_request_incidents', False) + if not self.api_host.startswith('https://'): self.api_host = f"https://{self.api_host}" + self.login() def login(self): logger.info(f"[{__parser__}][login]: Login in...", extra={'frontend': str(self.frontend)}) + auth_url = f"{self.api_host}/{self.AUTH_URI}" self.session = requests.session() self.session.headers.update(self.HEADERS) + payload = {'client_id': self.client_id, + 'client_secret': self.client_secret} try: - # Rate limiting seems to be 10/minute for the authentication endpoint response = self.session.post( - url=f"{self.api_host}/{self.AUTH_URI}", - data={ - 'client_id': self.client_id, - 'client_secret': self.client_secret - }, + auth_url, + data=payload, timeout=10, proxies=self.proxies, verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl @@ -96,13 +97,13 @@ def login(self): return False, ('Connection failed') except requests.exceptions.ReadTimeout: self.session = None - logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (ReadTimeout)', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (read_timeout)', extra={'frontend': str(self.frontend)}) return False, ('Connection failed') response.raise_for_status() if response.status_code not in [200, 201]: self.session = None - logger.error(f'[{__parser__}][login]: Authentication failed. Status code {response.status_code}', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Authentication failed. code {response.status_code}', extra={'frontend': str(self.frontend)}) return False, ('Authentication failed') ret = response.json() @@ -113,7 +114,6 @@ def login(self): return True, self.session - def __execute_query(self, method, url, query, timeout=10): retry = 3 while(retry > 0): @@ -128,10 +128,11 @@ def __execute_query(self, method, url, query, timeout=10): verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl ) elif(method == "POST"): + headers = {'Content-Type': 'application/json'} response = self.session.post( url, data=json.dumps(query), - headers={'Content-Type': 'application/json'}, + headers=headers, timeout=timeout, proxies=self.proxies, verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl @@ -142,12 +143,12 @@ def __execute_query(self, method, url, query, timeout=10): break # no error we break from the loop if response.status_code not in [200, 201]: - logger.error(f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}", - extra={'frontend': str(self.frontend)}) + logger.error( + f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}", extra={'frontend': str(self.frontend)} + ) return {} return response.json() - def unionDict(self, dictBase, dictToAdd): finalDict = {} for k, v in dictBase.items(): @@ -160,101 +161,145 @@ def unionDict(self, dictBase, dictToAdd): return finalDict def execute_query(self, method, url, query={}, timeout=10): - # query['limit'] = 100 + # can set a custom limit of entry we want to retrieve + customLimit = int(query.get('limit', -1)) - # Get first page of logs - jsonResp = self.__execute_query(method, url, query, timeout=timeout) + jsonResp = self.__execute_query(method, url, query, timeout=timeout) totalToRetrieve = jsonResp.get('meta', {}).get('pagination', {}).get('total', 0) - if totalToRetrieve > 0: - # Continue to paginate while totalToRetrieve is different than the length of all logs gathered from successive paginations - # The default page size is 100 (when "limit" parameter is not passed to query, like the case here) - while(totalToRetrieve != len(jsonResp.get('resources', []))): - query['offset'] = len(jsonResp.get('resources', [])) - - jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout) - self.update_lock() - - jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) + while(totalToRetrieve > 0 and totalToRetrieve != len(jsonResp.get('resources', []))): + # we retrieved enough data + if(customLimit > 0 and customLimit <= len(jsonResp['resources'])): + break + query['offset'] = len(jsonResp['resources']) + jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout) + self.update_lock() + jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) + #jsonResp += [jsonAdditionalResp] return jsonResp - - def get_detections(self, since, to): - logger.debug(f"[{__parser__}][get_detections]: From {since} until {to}", extra={'frontend': str(self.frontend)}) - - detections = [] - # first retrieve the detections raw ids - ret = self.execute_query(method="GET", - url=f"{self.api_host}/{self.DETECTION_URI}", - query={ - "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", - "sort": "last_behavior|desc" - }) + def getSummary(self): + nbSensor = self.getSensorsTotal() + version, updated = self.getApplianceVersion() + return nbSensor, version, updated + + def getApplianceVersion(self): + return 0, 0 + + def getSensorsTotal(self): + device_url = f"{self.api_host}/{self.DEVICE_URI}" + ret = self.execute_query('GET', device_url, {'limit': 1}) + return int(ret['meta']['pagination']['total']) + + def getAlerts(self, since, to): + ''' + we retrieve raw incidents and detections + ''' + logger.debug(f"[{__parser__}][getAlerts]: From {since} until {to}", extra={'frontend': str(self.frontend)}) + + finalRawAlerts = [] + # first retrieve the detection raw ids + alert_url = f"{self.api_host}/{self.DETECTION_URI}" + payload = { + "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", + "sort": "last_behavior|desc" + } + ret = self.execute_query("GET", alert_url, payload) ids = ret['resources'] - # then retrieve the content of selected detections ids if(len(ids) > 0): - ret = self.execute_query(method="POST", - url=f"{self.api_host}/{self.DETECTION_DETAILS_URI}", - query={"ids": ids}) - ret = ret['resources'] - for alert in ret: detections += [alert] - return detections - - def get_incidents(self, since, to): - logger.debug(f"[{__parser__}][get_incidents]: From {since} until {to}", extra={'frontend': str(self.frontend)}) - - incidents = [] - # first retrieve the incidents raw ids - ret = self.execute_query(method="GET", - url=f"{self.api_host}/{self.INCIDENT_URI}", - query={ - "filter": f"start:>'{since}'+start:<='{to}'", - "sort": "end|desc" - }) - ids = ret['resources'] - # then retrieve the content of selected incidents ids - if(len(ids) > 0): - ret = self.execute_query(method="POST", - url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", - query={"ids": ids}) - ret = ret['resources'] - for alert in ret: incidents += [alert] - return incidents + # retrieve the content of detection selected + alert_url = f"{self.api_host}/{self.DETECTION_DETAILS_URI}" + payload = {"ids": ids} + ret = self.execute_query("POST", alert_url, payload) + + alerts = ret['resources'] + for alert in alerts: + finalRawAlerts += [alert] + + if self.request_incidents: + # then retrieve the incident raw ids + alert_url = f"{self.api_host}/{self.INCIDENT_URI}" + payload = { + "filter": f"start:>'{since}'+start:<='{to}'", + "sort": "end|desc" + } - def get_logs(self, since, to): - logs = [] + ret = self.execute_query("GET", alert_url, payload) + ids = ret['resources'] - kinds = ["detections"] - if self.request_incidents: kinds.append("incidents") + if(len(ids) > 0): + # retrieve the content of selected incidents + alert_url = f"{self.api_host}/{self.INCIDENT_DETAILS_URI}" + payload = {"ids": ids} + ret = self.execute_query("POST", alert_url, payload) + alerts = ret['resources'] + for alert in alerts: + finalRawAlerts += [alert] + return finalRawAlerts - for kind in kinds: - try: - msg = f"Querying {kind} from {since}, to {to}" - logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) - get_func_type = getattr(self, f"get_{kind}") - logs.extend(get_func_type(since, to)) + def get_logs(self, kind, since, to): + msg = f"Querying {kind} from {since}, to {to}" + logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) - except Exception as e: - logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)}) - raise Exception(f"Error querying {kind} logs") - return logs + try: + return self.getAlerts(since, to) + except Exception as e: + logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)}) + raise Exception(f"Error querying {kind} logs") def format_log(self, log): - log['url'] = self.api_host # This static field is mandatory for parser + log['kind'] = self.kind + log['observer_version'] = self.observer_version + log['url'] = self.api_host return json.dumps(log) + def execute(self): + # Retrieve version of cybereason console + _, self.observer_version, _ = self.getSummary() + + self.kind = "details" + # Default timestamp is 24 hours ago + since = self.last_api_call or (timezone.now() - datetime.timedelta(days=7)) + # Get a batch of 24h at most, to avoid running the parser for too long + # delay the query time of 2 minutes, to avoid missing events + to = min(timezone.now()-timedelta(minutes=2), since + timedelta(hours=24)) + to = to.strftime("%Y-%m-%dT%H:%M:%SZ") + since = since.strftime("%Y-%m-%dT%H:%M:%SZ") + tmp_logs = self.get_logs(self.kind, since=since, to=to) + + # Downloading may take some while, so refresh token in Redis + self.update_lock() + + total = len(tmp_logs) + + if total > 0: + logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) + + # Logs sorted by timestamp descending, so first is newer + self.frontend.last_api_call = to + + elif self.last_api_call < timezone.now()-timedelta(hours=24): + # If no logs where retrieved during the last 24hours, + # move forward 1h to prevent stagnate ad vitam eternam + self.frontend.last_api_call += timedelta(hours=1) + + self.write_to_file([self.format_log(log) for log in tmp_logs]) + + # Writting may take some while, so refresh token in Redis + self.update_lock() + + logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)}) def test(self): try: - self.login() # establish a session to console - logger.info(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) + logger.debug(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) - since = (timezone.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") + query_time = (timezone.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") to = timezone.now().strftime("%Y-%m-%dT%H:%M:%SZ") - logs = self.get_logs(since, to) + logs = self.get_logs("details", query_time, to) - msg = f"{len(logs)} logs retrieved" + msg = f"{len(logs)} details retrieved" logger.info(f"[{__parser__}][test]: {msg}", extra={'frontend': str(self.frontend)}) return { "status": True, @@ -266,30 +311,3 @@ def test(self): "status": False, "error": str(e) } - - - def execute(self): - self.login() # establish a session to console - - # Default timestamp is 24 hours ago - since = self.last_api_call or (timezone.now() - datetime.timedelta(days=7)) - # Get a batch of 24h at most, to avoid running queries for too long - # also delay the query time of 3 minutes, to avoid missing events - to = min(timezone.now()-timedelta(minutes=3), since + timedelta(hours=24)) - to = to.strftime("%Y-%m-%dT%H:%M:%SZ") - since = since.strftime("%Y-%m-%dT%H:%M:%SZ") - - logs = self.get_logs(since=since, to=to) - total = len(logs) - if total > 0: - logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) - self.frontend.last_api_call = to # Logs sorted by timestamp descending, so first is newer - elif self.last_api_call < timezone.now()-timedelta(hours=24): - # If no logs where retrieved during the last 24hours, - # move forward 1h to prevent stagnate ad vitam eternam - self.frontend.last_api_call += timedelta(hours=1) - - self.write_to_file([self.format_log(log) for log in logs]) - self.update_lock() - - logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)}) From e9beae05710e63d4b6ebee70dca83b58fc1901be Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 26 Feb 2025 14:56:05 +0100 Subject: [PATCH 09/11] [API_PARSER][CROWDSTRIKE] Create CHANGELOG entry --- CHANGELOG | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG b/CHANGELOG index 3a0a8ae6..04a31d3c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [API_PARSER] [CATONETWORKS] Fix invalid HTML - [API_PARSER] [WAF_CLOUDFLARE] Correctly set and use the `fields` list - [API_PARSER] [CISCO_DUO] Avoid updating last timestamp when unecessary and add a 2-minute delay to collect +- [API_PARSER] [CROWDSTRIKE] Fix infinite loop when we encounters more than 100 logs (default limit) in one request ## [2.20.0] - 2025-02-04 From 5b3c560d1c1eb978d507529fdd914cedbf9c7edd Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 26 Feb 2025 15:06:59 +0100 Subject: [PATCH 10/11] [API_PARSER][CROWDSTRIKE] Raise exception in case of execute_query fail, instead of letting the collector continue (notably in case of looping) this could create side-effects --- vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 415cce3f..701eaffb 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -143,10 +143,9 @@ def __execute_query(self, method, url, query, timeout=10): break # no error we break from the loop if response.status_code not in [200, 201]: - logger.error( - f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}", extra={'frontend': str(self.frontend)} - ) - return {} + msg = f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}" + logger.error(msg, extra={'frontend': str(self.frontend)}) + raise Exception(msg) return response.json() def unionDict(self, dictBase, dictToAdd): From 505a61ea0f99126e5937c458e03716fadd52af0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Bertin?= Date: Wed, 26 Feb 2025 15:38:25 +0100 Subject: [PATCH 11/11] chore(CHANGELOG): Update to correct version --- CHANGELOG | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index 04a31d3c..854069e2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - [API_PARSER] [NETSKOPE] Correctly update the last_collected_timestamp, even when no logs are received - [API_PARSER] [CYBEREASON] Avoid None evaluation of log objects in format_log for malops +- [API_PARSER] [CROWDSTRIKE] Fix infinite loop when we encounters more than 100 logs (default limit) in one request ## [2.22.0] - 2025-02-25 @@ -31,7 +32,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [API_PARSER] [CATONETWORKS] Fix invalid HTML - [API_PARSER] [WAF_CLOUDFLARE] Correctly set and use the `fields` list - [API_PARSER] [CISCO_DUO] Avoid updating last timestamp when unecessary and add a 2-minute delay to collect -- [API_PARSER] [CROWDSTRIKE] Fix infinite loop when we encounters more than 100 logs (default limit) in one request ## [2.20.0] - 2025-02-04