diff --git a/cmk/rrd/rrd.py b/cmk/rrd/rrd.py index fef332599d2..c53a3f92847 100644 --- a/cmk/rrd/rrd.py +++ b/cmk/rrd/rrd.py @@ -728,25 +728,19 @@ def __init__(self, rrd_interface: RRDInterface): def create_rrds_keepalive(self, config_class: type[RRDConfig]) -> None: input_buffer = b"" self._rrd_helper_output_buffer = b"" - - job_queue: list[bytes] = [] - + job_queue = list[bytes]() console.verbose("Started Check_MK RRD creator.") try: # We read asynchronously from stdin and put the jobs into a queue. # That way the cmc main process will not be blocked by IO wait. while True: - if job_queue: - timeout: int | None = 0 - else: - timeout = None - - if self._rrd_helper_output_buffer: - wait_for_write = [1] - else: - wait_for_write = [] + readable, writeable = select.select( + [0], + [1] if self._rrd_helper_output_buffer else [], + [], + 0 if job_queue else None, + )[:-1] - readable, writeable = select.select([0], wait_for_write, [], timeout)[:-1] if 1 in writeable: self._write_rrd_helper_response() @@ -755,56 +749,22 @@ def create_rrds_keepalive(self, config_class: type[RRDConfig]) -> None: new_bytes = os.read(0, 4096) except Exception: new_bytes = b"" - if not new_bytes and not job_queue: console.verbose("Core closed stdin, all jobs finished. Exiting.") break - - input_buffer += new_bytes - parts: list[bytes] = input_buffer.split(b"\n") - if parts[-1] != b"": # last job not terminated - input_buffer = parts[-1] - else: - input_buffer = b"" - - parts = parts[:-1] - job_queue += parts + parts = (input_buffer + new_bytes).split(b"\n") + job_queue += parts[:-1] + input_buffer = parts[-1] # Create *one* RRD file if job_queue: - if job_queue[0] == b"*": - # Obsolete. We can't reload the config explicitly, as this is done already - # by core config generation. - console.verbose("Reloading configuration.") - else: - spec = job_queue[0].decode("utf-8") - parsed_spec = RRDSpec.parse(spec) - config = config_class(parsed_spec.host) - try: - self._create_rrd_from_spec(config, parsed_spec) - except self._rrd_interface.OperationalError as exc: - self._queue_rrd_helper_response(f"Error creating RRD: {exc!s}") - except OSError as exc: - self._queue_rrd_helper_response(f"Error creating RRD: {exc.strerror}") - except Exception as e: - if cmk.ccc.debug.enabled(): - raise - crash = CMKBaseCrashReport.from_exception( - paths.crash_dir, get_general_version_infos(paths.omd_root) - ) - CrashReportStore().save(crash) - self._queue_rrd_helper_response( - f"Error creating RRD for {spec}: {str(e) or traceback.format_exc()}" - ) + self._handle_job(job_queue[0].decode("utf-8"), config_class) del job_queue[0] except Exception: if cmk.ccc.debug.enabled(): raise - crash = CMKBaseCrashReport.from_exception( - paths.crash_dir, get_general_version_infos(paths.omd_root) - ) - CrashReportStore().save(crash) + create_crash_report() self._queue_rrd_helper_response( f"Check_MK RRD creator failed: {traceback.format_exc()}" ) @@ -816,6 +776,28 @@ def _write_rrd_helper_response(self) -> None: written = os.write(1, self._rrd_helper_output_buffer[:size]) self._rrd_helper_output_buffer = self._rrd_helper_output_buffer[written:] + def _handle_job(self, spec: str, config_class: type[RRDConfig]) -> None: + if spec == "*": + # Obsolete. We can't reload the config explicitly, as this is done already + # by core config generation. + console.verbose("Reloading configuration.") + else: + parsed_spec = RRDSpec.parse(spec) + config = config_class(parsed_spec.host) + try: + self._create_rrd_from_spec(config, parsed_spec) + except self._rrd_interface.OperationalError as exc: + self._queue_rrd_helper_response(f"Error creating RRD: {exc!s}") + except OSError as exc: + self._queue_rrd_helper_response(f"Error creating RRD: {exc.strerror}") + except Exception as e: + if cmk.ccc.debug.enabled(): + raise + create_crash_report() + self._queue_rrd_helper_response( + f"Error creating RRD for {spec}: {str(e) or traceback.format_exc()}" + ) + def _create_rrd_from_spec(self, config: RRDConfig, spec: RRDSpec) -> None: rrd_file_name = _create_rrd( self._rrd_interface, config, spec, self._queue_rrd_helper_response @@ -844,6 +826,14 @@ def _queue_rrd_helper_response(self, response: str) -> None: self._rrd_helper_output_buffer += (response + "\n").encode("utf-8") +def create_crash_report() -> None: + CrashReportStore().save( + CMKBaseCrashReport.from_exception( + paths.crash_dir, get_general_version_infos(paths.omd_root) + ) + ) + + def _float_or_nan(s: str | None) -> str: if s is None: return "U"