Skip to content

Commit

Permalink
Tiny mechanical cleanups
Browse files Browse the repository at this point in the history
Change-Id: I3c436dd7ce7d1743a4df915f2f02ad08cc310692
  • Loading branch information
spt29 committed Feb 21, 2025
1 parent f5670ba commit 294f68a
Showing 1 changed file with 42 additions and 52 deletions.
94 changes: 42 additions & 52 deletions cmk/rrd/rrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,25 +728,19 @@ def __init__(self, rrd_interface: RRDInterface):
def create_rrds_keepalive(self, config_class: type[RRDConfig]) -> None:
input_buffer = b""
self._rrd_helper_output_buffer = b""

job_queue: list[bytes] = []

job_queue = list[bytes]()
console.verbose("Started Check_MK RRD creator.")
try:
# We read asynchronously from stdin and put the jobs into a queue.
# That way the cmc main process will not be blocked by IO wait.
while True:
if job_queue:
timeout: int | None = 0
else:
timeout = None

if self._rrd_helper_output_buffer:
wait_for_write = [1]
else:
wait_for_write = []
readable, writeable = select.select(
[0],
[1] if self._rrd_helper_output_buffer else [],
[],
0 if job_queue else None,
)[:-1]

readable, writeable = select.select([0], wait_for_write, [], timeout)[:-1]
if 1 in writeable:
self._write_rrd_helper_response()

Expand All @@ -755,56 +749,22 @@ def create_rrds_keepalive(self, config_class: type[RRDConfig]) -> None:
new_bytes = os.read(0, 4096)
except Exception:
new_bytes = b""

if not new_bytes and not job_queue:
console.verbose("Core closed stdin, all jobs finished. Exiting.")
break

input_buffer += new_bytes
parts: list[bytes] = input_buffer.split(b"\n")
if parts[-1] != b"": # last job not terminated
input_buffer = parts[-1]
else:
input_buffer = b""

parts = parts[:-1]
job_queue += parts
parts = (input_buffer + new_bytes).split(b"\n")
job_queue += parts[:-1]
input_buffer = parts[-1]

# Create *one* RRD file
if job_queue:
if job_queue[0] == b"*":
# Obsolete. We can't reload the config explicitly, as this is done already
# by core config generation.
console.verbose("Reloading configuration.")
else:
spec = job_queue[0].decode("utf-8")
parsed_spec = RRDSpec.parse(spec)
config = config_class(parsed_spec.host)
try:
self._create_rrd_from_spec(config, parsed_spec)
except self._rrd_interface.OperationalError as exc:
self._queue_rrd_helper_response(f"Error creating RRD: {exc!s}")
except OSError as exc:
self._queue_rrd_helper_response(f"Error creating RRD: {exc.strerror}")
except Exception as e:
if cmk.ccc.debug.enabled():
raise
crash = CMKBaseCrashReport.from_exception(
paths.crash_dir, get_general_version_infos(paths.omd_root)
)
CrashReportStore().save(crash)
self._queue_rrd_helper_response(
f"Error creating RRD for {spec}: {str(e) or traceback.format_exc()}"
)
self._handle_job(job_queue[0].decode("utf-8"), config_class)
del job_queue[0]

except Exception:
if cmk.ccc.debug.enabled():
raise
crash = CMKBaseCrashReport.from_exception(
paths.crash_dir, get_general_version_infos(paths.omd_root)
)
CrashReportStore().save(crash)
create_crash_report()
self._queue_rrd_helper_response(
f"Check_MK RRD creator failed: {traceback.format_exc()}"
)
Expand All @@ -816,6 +776,28 @@ def _write_rrd_helper_response(self) -> None:
written = os.write(1, self._rrd_helper_output_buffer[:size])
self._rrd_helper_output_buffer = self._rrd_helper_output_buffer[written:]

def _handle_job(self, spec: str, config_class: type[RRDConfig]) -> None:
if spec == "*":
# Obsolete. We can't reload the config explicitly, as this is done already
# by core config generation.
console.verbose("Reloading configuration.")
else:
parsed_spec = RRDSpec.parse(spec)
config = config_class(parsed_spec.host)
try:
self._create_rrd_from_spec(config, parsed_spec)
except self._rrd_interface.OperationalError as exc:
self._queue_rrd_helper_response(f"Error creating RRD: {exc!s}")
except OSError as exc:
self._queue_rrd_helper_response(f"Error creating RRD: {exc.strerror}")
except Exception as e:
if cmk.ccc.debug.enabled():
raise
create_crash_report()
self._queue_rrd_helper_response(
f"Error creating RRD for {spec}: {str(e) or traceback.format_exc()}"
)

def _create_rrd_from_spec(self, config: RRDConfig, spec: RRDSpec) -> None:
rrd_file_name = _create_rrd(
self._rrd_interface, config, spec, self._queue_rrd_helper_response
Expand Down Expand Up @@ -844,6 +826,14 @@ def _queue_rrd_helper_response(self, response: str) -> None:
self._rrd_helper_output_buffer += (response + "\n").encode("utf-8")


def create_crash_report() -> None:
CrashReportStore().save(
CMKBaseCrashReport.from_exception(
paths.crash_dir, get_general_version_infos(paths.omd_root)
)
)


def _float_or_nan(s: str | None) -> str:
if s is None:
return "U"
Expand Down

0 comments on commit 294f68a

Please sign in to comment.