Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve manager during updates and fix deadlock during restart before full start #193

Merged
merged 5 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions manager/Administration.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ def run(self, services):
try:
cli = JsonSocket(_cli)
cmd = cli.recv()
except json.JSONDecodeError as e:
logger.error("Error decoding message from admin: {}".format(e))
try:
cli.send({
'status': 'KO',
'errors': [str(e)]
})
except Exception as e:
logger.critical("Error while trying to reply to admin: {}".format(e))
continue
except Exception as e:
logger.error("Error receiving data from admin: {0}".format(e))
continue
Expand Down
65 changes: 34 additions & 31 deletions manager/Services.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,17 @@ def start_all(self):
"""
Start all the filters.
"""
with self._lock:
for _, filter in self._filters.items():
if self.start_one(filter, True):
ret = Services._wait_process_ready(filter)
if ret:
logger.error("Error when starting filter {}: {}".format(filter['name'], ret))
self.stop_one(filter, no_lock=True)
self.clean_one(filter, no_lock=True)
else:
logger.debug("Linking UNIX sockets...")
filter['status'] = psutil.STATUS_RUNNING
call(['ln', '-s', filter['socket'], filter['socket_link']])
for _, filter in self._filters.items():
if self.start_one(filter, True):
ret = Services._wait_process_ready(filter)
if ret:
logger.error("Error when starting filter {}: {}".format(filter['name'], ret))
self.stop_one(filter, no_lock=True)
self.clean_one(filter, no_lock=True)
else:
logger.debug("Linking UNIX sockets...")
filter['status'] = psutil.STATUS_RUNNING
call(['ln', '-s', filter['socket'], filter['socket_link']])

def rotate_logs_all(self):
"""
Expand All @@ -70,24 +69,14 @@ def stop_all(self):
"""
Stop all the filters
"""
logger.debug("stop_all: before lock")
with self._lock:
logger.debug("stop_all: after lock")
for _, filter in self._filters.items():
try:
self.stop_one(filter, True)
logger.debug("stop_all: after stop_one")
self.clean_one(filter, True)
logger.debug("stop_all: after clean_one")
except Exception:
pass

def restart_all(self):
"""
Restart all the filters
"""
self.stop_all()
self.start_all()
for _, filter in self._filters.items():
try:
self.stop_one(filter, True)
logger.debug("stop_all: after stop_one")
self.clean_one(filter, True)
logger.debug("stop_all: after clean_one")
except Exception:
pass

@staticmethod
def _build_cmd(filt):
Expand Down Expand Up @@ -340,9 +329,16 @@ def update(self, names, prefix, suffix):

logger.info("Update: Configuration loaded")

if not names:
# Do a symetric diff of 2 sets, composed of the keys from current filters' dict and the dict loaded from conf
# and unpack values as a list
# This yields a list of the new and deleted filters (by name only) = a diff of configured filters
names = [*(set(self._filters.keys()) ^ set(conf_filters.keys()))]

with self._lock:
errors = []
new = {}
logger.info("updating filters {}".format(names))
for n in names:
try:
new[n] = conf_filters[n]
Expand Down Expand Up @@ -382,16 +378,23 @@ def update(self, names, prefix, suffix):

for n, c in new.items():
cmd = self._build_cmd(c)
p = Popen(cmd)
try:
p = Popen(cmd)
p.wait(timeout=1)
except OSError as e:
logger.error("cannot start filter: " + str(e))
c['status'] = psutil.STATUS_DEAD
errors.append({"filter": n, "error": "cannot start filter: {}".format(str(e))})
continue
except TimeoutExpired:
if c['log_level'].lower() == "developer":
logger.debug("Debug mode enabled. Ignoring timeout at process startup.")
else:
logger.error("Error starting filter. Did not daemonize before timeout. Killing it.")
p.kill()
p.wait()
errors.append({"filter": n, "error": "Filter did not daemonize before timeout."})
continue
ret = Services._wait_process_ready(c)
if ret:
logger.error("Unable to update filter {}: {}".format(n, ret))
Expand Down
Loading