diff --git a/manager/Administration.py b/manager/Administration.py index c0e4f0a8..259c9137 100644 --- a/manager/Administration.py +++ b/manager/Administration.py @@ -88,6 +88,16 @@ def run(self, services): try: cli = JsonSocket(_cli) cmd = cli.recv() + except json.JSONDecodeError as e: + logger.error("Error decoding message from admin: {}".format(e)) + try: + cli.send({ + 'status': 'KO', + 'errors': [str(e)] + }) + except Exception as e: + logger.critical("Error while trying to reply to admin: {}".format(e)) + continue except Exception as e: logger.error("Error receiving data from admin: {0}".format(e)) continue diff --git a/manager/Services.py b/manager/Services.py index 208eb76b..f60bf477 100644 --- a/manager/Services.py +++ b/manager/Services.py @@ -42,18 +42,17 @@ def start_all(self): """ Start all the filters. """ - with self._lock: - for _, filter in self._filters.items(): - if self.start_one(filter, True): - ret = Services._wait_process_ready(filter) - if ret: - logger.error("Error when starting filter {}: {}".format(filter['name'], ret)) - self.stop_one(filter, no_lock=True) - self.clean_one(filter, no_lock=True) - else: - logger.debug("Linking UNIX sockets...") - filter['status'] = psutil.STATUS_RUNNING - call(['ln', '-s', filter['socket'], filter['socket_link']]) + for _, filter in self._filters.items(): + if self.start_one(filter, True): + ret = Services._wait_process_ready(filter) + if ret: + logger.error("Error when starting filter {}: {}".format(filter['name'], ret)) + self.stop_one(filter, no_lock=True) + self.clean_one(filter, no_lock=True) + else: + logger.debug("Linking UNIX sockets...") + filter['status'] = psutil.STATUS_RUNNING + call(['ln', '-s', filter['socket'], filter['socket_link']]) def rotate_logs_all(self): """ @@ -70,24 +69,14 @@ def stop_all(self): """ Stop all the filters """ - logger.debug("stop_all: before lock") - with self._lock: - logger.debug("stop_all: after lock") - for _, filter in self._filters.items(): - try: - self.stop_one(filter, True) - logger.debug("stop_all: after stop_one") - self.clean_one(filter, True) - logger.debug("stop_all: after clean_one") - except Exception: - pass - - def restart_all(self): - """ - Restart all the filters - """ - self.stop_all() - self.start_all() + for _, filter in self._filters.items(): + try: + self.stop_one(filter, True) + logger.debug("stop_all: after stop_one") + self.clean_one(filter, True) + logger.debug("stop_all: after clean_one") + except Exception: + pass @staticmethod def _build_cmd(filt): @@ -340,9 +329,16 @@ def update(self, names, prefix, suffix): logger.info("Update: Configuration loaded") + if not names: + # Do a symetric diff of 2 sets, composed of the keys from current filters' dict and the dict loaded from conf + # and unpack values as a list + # This yields a list of the new and deleted filters (by name only) = a diff of configured filters + names = [*(set(self._filters.keys()) ^ set(conf_filters.keys()))] + with self._lock: errors = [] new = {} + logger.info("updating filters {}".format(names)) for n in names: try: new[n] = conf_filters[n] @@ -382,9 +378,14 @@ def update(self, names, prefix, suffix): for n, c in new.items(): cmd = self._build_cmd(c) - p = Popen(cmd) try: + p = Popen(cmd) p.wait(timeout=1) + except OSError as e: + logger.error("cannot start filter: " + str(e)) + c['status'] = psutil.STATUS_DEAD + errors.append({"filter": n, "error": "cannot start filter: {}".format(str(e))}) + continue except TimeoutExpired: if c['log_level'].lower() == "developer": logger.debug("Debug mode enabled. Ignoring timeout at process startup.") @@ -392,6 +393,8 @@ def update(self, names, prefix, suffix): logger.error("Error starting filter. Did not daemonize before timeout. Killing it.") p.kill() p.wait() + errors.append({"filter": n, "error": "Filter did not daemonize before timeout."}) + continue ret = Services._wait_process_ready(c) if ret: logger.error("Unable to update filter {}: {}".format(n, ret)) diff --git a/tests/manager_socket/update_test.py b/tests/manager_socket/update_test.py index d7f9ca9c..7ab28ede 100644 --- a/tests/manager_socket/update_test.py +++ b/tests/manager_socket/update_test.py @@ -1,6 +1,6 @@ import logging from time import sleep -from manager_socket.utils import requests, check_filter_files, PATH_CONF_FTEST, CONF_EMPTY, CONF_ONE, CONF_ONE_V2, CONF_THREE, CONF_THREE_V2, CONF_FTEST, CONF_FTEST_WRONG_CONF, REQ_MONITOR, REQ_UPDATE_EMPTY, REQ_UPDATE_ONE, REQ_UPDATE_TWO, REQ_UPDATE_THREE, REQ_UPDATE_NON_EXISTING, REQ_UPDATE_NO_FILTER, RESP_EMPTY, RESP_TEST_1, RESP_TEST_2, RESP_TEST_3, RESP_STATUS_OK, RESP_STATUS_KO, RESP_ERROR_FILTER_NOT_EXISTING +from manager_socket.utils import requests, check_filter_files, PATH_CONF_FTEST, CONF_EMPTY, CONF_ONE, CONF_ONE_V2, CONF_THREE, CONF_THREE_V2, CONF_THREE_V2_ALT, CONF_TWO_V2, CONF_FOUR_V2, CONF_FTEST, CONF_FTEST_WRONG_CONF, REQ_MONITOR, REQ_UPDATE_EMPTY, REQ_UPDATE_ONE, REQ_UPDATE_TWO, REQ_UPDATE_THREE, REQ_UPDATE_NON_EXISTING, REQ_UPDATE_NO_FILTER, RESP_EMPTY, RESP_TEST_1, RESP_TEST_2, RESP_TEST_3, RESP_TEST_4, RESP_STATUS_OK, RESP_STATUS_KO, RESP_ERROR_FILTER_NOT_EXISTING from tools.darwin_utils import darwin_configure, darwin_remove_configuration, darwin_start, darwin_stop from tools.output import print_result @@ -39,6 +39,9 @@ def run(): non_existing_filter, non_existing_filter_conf_v2, update_no_filter, + many_update_diff_one_more_v2, + many_update_diff_one_less_v2, + many_update_diff_one_more_one_less_v2, ] for i in tests: @@ -318,7 +321,7 @@ def many_filters_to_one(): logging.error("many_filters_to_one: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay darwin_configure(CONF_ONE) resp = requests(REQ_UPDATE_TWO) if RESP_STATUS_OK not in resp: @@ -350,7 +353,7 @@ def many_filters_to_one_conf_v2(): logging.error("many_filters_to_one: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay darwin_configure(CONF_ONE_V2) resp = requests(REQ_UPDATE_TWO) if RESP_STATUS_OK not in resp: @@ -382,13 +385,13 @@ def one_update_none(): logging.error("one_update_none: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_EMPTY) if RESP_STATUS_OK not in resp: logging.error("one_update_none: Update response error; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_MONITOR) if RESP_TEST_1 not in resp: logging.error("one_update_none: Mismatching monitor response; got \"{}\"".format(resp)) @@ -413,13 +416,13 @@ def one_update_none_conf_v2(): logging.error("one_update_none: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_EMPTY) if RESP_STATUS_OK not in resp: logging.error("one_update_none: Update response error; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_MONITOR) if RESP_TEST_1 not in resp: logging.error("one_update_none: Mismatching monitor response; got \"{}\"".format(resp)) @@ -444,13 +447,13 @@ def one_update_one(): logging.error("one_update_one: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(2) # Need this beacause of the starting delay + sleep(2) # Need this because of the starting delay resp = requests(REQ_UPDATE_ONE) if RESP_STATUS_OK not in resp: logging.error("one_update_one: Update response error; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_MONITOR) if RESP_TEST_1 not in resp: logging.error("one_update_one: Mismatching monitor response; got \"{}\"".format(resp)) @@ -475,13 +478,13 @@ def one_update_one_conf_v2(): logging.error("one_update_one: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(2) # Need this beacause of the starting delay + sleep(2) # Need this because of the starting delay resp = requests(REQ_UPDATE_ONE) if RESP_STATUS_OK not in resp: logging.error("one_update_one: Update response error; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_MONITOR) if RESP_TEST_1 not in resp: logging.error("one_update_one: Mismatching monitor response; got \"{}\"".format(resp)) @@ -576,7 +579,7 @@ def many_update_none(): logging.error("many_update_none: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_EMPTY) if RESP_STATUS_OK not in resp: logging.error("many_update_none: Update response error; got \"{}\"".format(resp)) @@ -606,7 +609,7 @@ def many_update_none_conf_v2(): logging.error("many_update_none: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_EMPTY) if RESP_STATUS_OK not in resp: logging.error("many_update_none: Update response error; got \"{}\"".format(resp)) @@ -636,7 +639,7 @@ def many_update_one(): logging.error("many_update_one: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_ONE) if RESP_STATUS_OK not in resp: logging.error("many_update_one: Update response error; got \"{}\"".format(resp)) @@ -666,7 +669,7 @@ def many_update_one_conf_v2(): logging.error("many_update_one: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_ONE) if RESP_STATUS_OK not in resp: logging.error("many_update_one: Update response error; got \"{}\"".format(resp)) @@ -695,7 +698,7 @@ def many_update_many(): logging.error("many_update_many: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_TWO) if RESP_STATUS_OK not in resp: logging.error("many_update_many: Update response error; got \"{}\"".format(resp)) @@ -724,7 +727,7 @@ def many_update_many_conf_v2(): logging.error("many_update_many: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_TWO) if RESP_STATUS_OK not in resp: logging.error("many_update_many: Update response error; got \"{}\"".format(resp)) @@ -918,7 +921,7 @@ def many_update_all(): logging.error("many_update_all: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_THREE) if RESP_STATUS_OK not in resp: logging.error("many_update_all: Update response error; got \"{}\"".format(resp)) @@ -948,7 +951,7 @@ def many_update_all_conf_v2(): logging.error("many_update_all: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_THREE) if RESP_STATUS_OK not in resp: logging.error("many_update_all: Update response error; got \"{}\"".format(resp)) @@ -978,7 +981,7 @@ def non_existing_filter(): logging.error("non_existing_filter: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_NON_EXISTING) if RESP_ERROR_FILTER_NOT_EXISTING not in resp: logging.error("non_existing_filter: Update response error; got \"{}\"".format(resp)) @@ -1008,7 +1011,7 @@ def non_existing_filter_conf_v2(): logging.error("non_existing_filter: Mismatching monitor response; got \"{}\"".format(resp)) ret = False - sleep(1) # Need this beacause of the starting delay + sleep(1) # Need this because of the starting delay resp = requests(REQ_UPDATE_NON_EXISTING) if RESP_ERROR_FILTER_NOT_EXISTING not in resp: logging.error("non_existing_filter: Update response error; got \"{}\"".format(resp)) @@ -1052,4 +1055,105 @@ def update_no_filter(): darwin_stop(process) darwin_remove_configuration() darwin_remove_configuration(path=PATH_CONF_FTEST) - return ret \ No newline at end of file + return ret + + +def many_update_diff_one_more_v2(): + + ret = True + + darwin_configure(CONF_THREE_V2) + darwin_configure(CONF_FTEST, path=PATH_CONF_FTEST) + process = darwin_start() + + resp = requests(REQ_MONITOR) + if not all(x in resp for x in [RESP_TEST_1, RESP_TEST_2, RESP_TEST_3]): + logging.error("many_update_diff_one_more_v2: Mismatching monitor response; got \"{}\"".format(resp)) + ret = False + + sleep(1) # Need this because of the starting delay + darwin_configure(CONF_FOUR_V2) + resp = requests(REQ_UPDATE_EMPTY) + if RESP_STATUS_OK not in resp: + logging.error("many_update_diff_one_more_v2: Update response error; got \"{}\"".format(resp)) + ret = False + + resp = requests(REQ_MONITOR) + if not all(x in resp for x in [RESP_TEST_1, RESP_TEST_2, RESP_TEST_3, RESP_TEST_4]): + logging.error("many_update_diff_one_more_v2: Mismatching monitor response; got \"{}\"".format(resp)) + ret = False + + darwin_stop(process) + darwin_remove_configuration() + darwin_remove_configuration(path=PATH_CONF_FTEST) + return ret + + +def many_update_diff_one_less_v2(): + + ret = True + + darwin_configure(CONF_THREE_V2) + darwin_configure(CONF_FTEST, path=PATH_CONF_FTEST) + process = darwin_start() + + resp = requests(REQ_MONITOR) + if not all(x in resp for x in [RESP_TEST_1, RESP_TEST_2, RESP_TEST_3]): + logging.error("many_update_diff_one_less_v2: Mismatching monitor response; got \"{}\"".format(resp)) + ret = False + + sleep(1) # Need this because of the starting delay + darwin_configure(CONF_TWO_V2) + resp = requests(REQ_UPDATE_EMPTY) + if RESP_STATUS_OK not in resp: + logging.error("many_update_diff_one_less_v2: Update response error; got \"{}\"".format(resp)) + ret = False + + resp = requests(REQ_MONITOR) + if not all(x in resp for x in [RESP_TEST_1, RESP_TEST_2]): + logging.error("many_update_diff_one_less_v2: Mismatching monitor response; got \"{}\"".format(resp)) + ret = False + + if RESP_TEST_3 in resp: + logging.error('many_update_diff_one_less_v2: Too much filters in monitoring response; got "{}"'.format(resp)) + ret = False + + darwin_stop(process) + darwin_remove_configuration() + darwin_remove_configuration(path=PATH_CONF_FTEST) + return ret + + +def many_update_diff_one_more_one_less_v2(): + + ret = True + + darwin_configure(CONF_THREE_V2) + darwin_configure(CONF_FTEST, path=PATH_CONF_FTEST) + process = darwin_start() + + resp = requests(REQ_MONITOR) + if not all(x in resp for x in [RESP_TEST_1, RESP_TEST_2, RESP_TEST_3]): + logging.error("many_update_diff_one_more_one_less_v2: Mismatching monitor response; got \"{}\"".format(resp)) + ret = False + + sleep(1) # Need this because of the starting delay + darwin_configure(CONF_THREE_V2_ALT) + resp = requests(REQ_UPDATE_EMPTY) + if RESP_STATUS_OK not in resp: + logging.error("many_update_diff_one_more_one_less_v2: Update response error; got \"{}\"".format(resp)) + ret = False + + resp = requests(REQ_MONITOR) + if not all(x in resp for x in [RESP_TEST_1, RESP_TEST_2, RESP_TEST_4]): + logging.error("many_update_diff_one_more_one_less_v2: Mismatching monitor response; got \"{}\"".format(resp)) + ret = False + + if RESP_TEST_3 in resp: + logging.error('many_update_diff_one_more_one_less_v2: Wrong filter in monitor response; got "{}"'.format(resp)) + ret = False + + darwin_stop(process) + darwin_remove_configuration() + darwin_remove_configuration(path=PATH_CONF_FTEST) + return ret diff --git a/tests/manager_socket/utils.py b/tests/manager_socket/utils.py index 5345942e..9c459045 100644 --- a/tests/manager_socket/utils.py +++ b/tests/manager_socket/utils.py @@ -193,6 +193,135 @@ def check_filter_files(filter_name, extension=".1"): }} }} """.format(DEFAULT_FILTER_PATH, PATH_CONF_FTEST) +CONF_THREE_V2_ALT = """{{ + "version": 2, + "filters": [ + {{ + "name": "test_1", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }}, + {{ + "name": "test_2", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }}, + {{ + "name": "test_4", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }} + ], + "report_stats": {{ + "file": {{ + "filepath": "/tmp/darwin-stats", + "permissions": 640 + }}, + "interval": 5 + }} +}} +""".format(DEFAULT_FILTER_PATH, PATH_CONF_FTEST) +CONF_TWO_V2 = """{{ + "version": 2, + "filters": [ + {{ + "name": "test_1", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }}, + {{ + "name": "test_2", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }} + ], + "report_stats": {{ + "file": {{ + "filepath": "/tmp/darwin-stats", + "permissions": 640 + }}, + "interval": 5 + }} +}} +""".format(DEFAULT_FILTER_PATH, PATH_CONF_FTEST) +CONF_FOUR_V2 = """{{ + "version": 2, + "filters": [ + {{ + "name": "test_1", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }}, + {{ + "name": "test_2", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }}, + {{ + "name": "test_3", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }}, + {{ + "name": "test_4", + "exec_path": "{0}darwin_test", + "config_file": "{1}", + "output": "NONE", + "next_filter": "", + "nb_thread": 1, + "log_level": "DEBUG", + "cache_size": 0 + }} + ], + "report_stats": {{ + "file": {{ + "filepath": "/tmp/darwin-stats", + "permissions": 640 + }}, + "interval": 5 + }} +}} +""".format(DEFAULT_FILTER_PATH, PATH_CONF_FTEST) CONF_THREE_ONE_WRONG = """{{ "test_1": {{ "exec_path": "{0}darwin_test", @@ -293,6 +422,7 @@ def check_filter_files(filter_name, extension=".1"): RESP_TEST_1 = '"test_1": {"status": "running", "connections": 0, "received": 0, "entryErrors": 0, "matches": 0, "failures": 0, "proc_stats": {' RESP_TEST_2 = '"test_2": {"status": "running", "connections": 0, "received": 0, "entryErrors": 0, "matches": 0, "failures": 0, "proc_stats": {' RESP_TEST_3 = '"test_3": {"status": "running", "connections": 0, "received": 0, "entryErrors": 0, "matches": 0, "failures": 0, "proc_stats": {' +RESP_TEST_4 = '"test_4": {"status": "running", "connections": 0, "received": 0, "entryErrors": 0, "matches": 0, "failures": 0, "proc_stats": {' RESP_STATUS_OK = '"status": "OK"' RESP_STATUS_KO = '"status": "KO"' RESP_ERROR_NO_PID = '"error": "PID file not accessible"'