From 748ac585392ed22102a5a57125605d5b5dda4bc4 Mon Sep 17 00:00:00 2001 From: Fu Hanxi Date: Wed, 22 Jan 2025 12:59:23 +0100 Subject: [PATCH] fix: read all chunks in queue since we have interval in listener --- pytest-embedded/pytest_embedded/dut_factory.py | 11 ++++++----- pytest-embedded/pytest_embedded/log.py | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pytest-embedded/pytest_embedded/dut_factory.py b/pytest-embedded/pytest_embedded/dut_factory.py index 5e2533ed..c327d492 100644 --- a/pytest-embedded/pytest_embedded/dut_factory.py +++ b/pytest-embedded/pytest_embedded/dut_factory.py @@ -52,15 +52,16 @@ def msg_queue_gn() -> MessageQueue: def _listen(q: MessageQueue, filepath: str, with_timestamp: bool = True, count: int = 1, total: int = 1) -> None: _added_prefix = False while True: - msg = q.get() - if not msg: + msgs = q.get_all() + if not msgs: continue + msg_b = b''.join(msgs) with open(filepath, 'ab') as fw: - fw.write(msg) + fw.write(msg_b) fw.flush() - _s = to_str(msg) + _s = to_str(msg_b) if not _s: continue @@ -87,7 +88,7 @@ def _listen(q: MessageQueue, filepath: str, with_timestamp: bool = True, count: _stdout.write(_s) _stdout.flush() - time.sleep(0.1) + time.sleep(0.05) def _listener_gn(msg_queue, _pexpect_logfile, with_timestamp, dut_index, dut_total) -> multiprocessing.Process: diff --git a/pytest-embedded/pytest_embedded/log.py b/pytest-embedded/pytest_embedded/log.py index c82c9311..76c6b7fe 100644 --- a/pytest-embedded/pytest_embedded/log.py +++ b/pytest-embedded/pytest_embedded/log.py @@ -8,6 +8,7 @@ import textwrap import uuid from multiprocessing import queues +from queue import Empty from typing import AnyStr, List, Optional, Union import pexpect.fdpexpect @@ -26,6 +27,8 @@ class MessageQueue(queues.Queue): def __init__(self, *args, **kwargs): if 'ctx' not in kwargs: kwargs['ctx'] = _ctx + + self.lock = _ctx.Lock() super().__init__(*args, **kwargs) def put(self, obj, **kwargs): @@ -42,6 +45,17 @@ def put(self, obj, **kwargs): except: # noqa # queue might be closed pass + def get_all(self) -> List[bytes]: + res = [] + with self.lock: + while True: + try: + res.append(self.get_nowait()) + except Empty: + break + + return res + def write(self, s: AnyStr): self.put(s)