From ff3d4c322a2ddbfa661c6a8d8d12be3b6bd6d108 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Fors?= Date: Mon, 28 Oct 2024 11:40:58 +0100 Subject: [PATCH 1/4] Use .insert to be consistent --- modal/_telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modal/_telemetry.py b/modal/_telemetry.py index 42859aea5..f0c8e5b87 100644 --- a/modal/_telemetry.py +++ b/modal/_telemetry.py @@ -139,7 +139,7 @@ def _send(self): logger.debug(f"failed to send event: {e}") def install(self): - sys.meta_path = [self] + sys.meta_path # type: ignore + sys.meta_path.insert(0, self) # type: ignore def remove(self): sys.meta_path.remove(self) # type: ignore From 2c950155d07c82ffd14731bc6f42042eaed779b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Fors?= Date: Mon, 28 Oct 2024 11:41:48 +0100 Subject: [PATCH 2/4] Allow to import specific library --- test/telemetry_test.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/telemetry_test.py b/test/telemetry_test.py index 002bbaf1e..ee4eeb10f 100644 --- a/test/telemetry_test.py +++ b/test/telemetry_test.py @@ -122,22 +122,24 @@ def test_import_tracing(monkeypatch): # For manual testing -def generate_import_telemetry(telemetry_socket): +def generate_import_telemetry(import_name, telemetry_socket): instrument_imports(telemetry_socket) t0 = time.monotonic() - import kubernetes # noqa + __import__(import_name) return time.monotonic() - t0 # For manual testing def main(): + import_name = sys.argv[1] if len(sys.argv) >= 2 else "kubernetes" telemetry_socket = os.environ.get("MODAL_TELEMETRY_SOCKET") if telemetry_socket: - latency = generate_import_telemetry(telemetry_socket) + latency = generate_import_telemetry(import_name, telemetry_socket) else: with TelemetryConsumer() as consumer: - latency = generate_import_telemetry(consumer.socket_filename.absolute().as_posix()) + telemetry_socket = consumer.socket_filename.absolute().as_posix() + latency = generate_import_telemetry(import_name, telemetry_socket) while True: try: @@ -146,7 +148,7 @@ def main(): except queue.Empty: break - print(f"import kubernetes took {latency:.02}s") + print(f"'import {import_name}' took {latency:.02}s") if __name__ == "__main__": From ba41e3f4d31086ca3db633c06a400cbbafefdc99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Fors?= Date: Mon, 28 Oct 2024 13:57:18 +0100 Subject: [PATCH 3/4] Emit load start and end together --- modal/_telemetry.py | 77 +++++++++++++++++++++++------------------- test/telemetry_test.py | 2 +- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/modal/_telemetry.py b/modal/_telemetry.py index f0c8e5b87..ddfb8a8cb 100644 --- a/modal/_telemetry.py +++ b/modal/_telemetry.py @@ -65,7 +65,7 @@ def get_resource_reader(self, fullname: str): class ImportInterceptor(importlib.abc.MetaPathFinder): - loading: typing.Dict[str, typing.Tuple[str, float]] + loading: typing.Dict[str, float] tracing_socket: socket.socket events: queue.Queue @@ -78,7 +78,7 @@ def connect(cls, socket_filename: str) -> "ImportInterceptor": def __init__(self, tracing_socket: socket.socket): self.loading = {} self.tracing_socket = tracing_socket - self.events = queue.Queue(maxsize=16 * 1024) + self.events = queue.Queue(maxsize=8 * 1024) sender = threading.Thread(target=self._send, daemon=True) sender.start() @@ -95,48 +95,57 @@ def find_spec(self, fullname, path, target=None): def load_start(self, name): t0 = time.monotonic() - span_id = str(uuid.uuid4()) - self.emit( - {"span_id": span_id, "timestamp": time.time(), "event": MODULE_LOAD_START, "attributes": {"name": name}} - ) - self.loading[name] = (span_id, t0) + self.loading[name] = t0 def load_end(self, name): - span_id, t0 = self.loading.pop(name, (None, None)) + t1 = time.monotonic() + timestamp1 = time.time() + + t0 = self.loading.pop(name, None) if t0 is None: return - latency = time.monotonic() - t0 - self.emit( - { - "span_id": span_id, - "timestamp": time.time(), - "event": MODULE_LOAD_END, - "attributes": { - "name": name, - "latency": latency, - }, - } - ) - - def emit(self, event): + + span_id = str(uuid.uuid4()) + latency = t1 - t0 + timestamp0 = timestamp1 - latency + + event_start = { + "span_id": span_id, + "timestamp": timestamp0, + "event": MODULE_LOAD_START, + "attributes": {"name": name}, + } + event_end = { + "span_id": span_id, + "timestamp": timestamp1, + "event": MODULE_LOAD_END, + "attributes": { + "name": name, + "latency": latency, + }, + } + self.emit((event_start, event_end)) + + def emit(self, events): try: - self.events.put_nowait(event) + self.events.put_nowait(events) except queue.Full: logger.debug("failed to emit event: queue full") def _send(self): while True: - event = self.events.get() - try: - msg = json.dumps(event).encode("utf-8") - except BaseException as e: - logger.debug(f"failed to serialize event: {e}") - continue - try: - encoded_len = pack(MESSAGE_HEADER_FORMAT, len(msg)) - self.tracing_socket.send(encoded_len + msg) - except OSError as e: - logger.debug(f"failed to send event: {e}") + events = self.events.get() + for event in events: + try: + msg = json.dumps(event).encode("utf-8") + except BaseException as e: + logger.debug(f"failed to serialize event: {e}") + continue + try: + encoded_len = pack(MESSAGE_HEADER_FORMAT, len(msg)) + self.tracing_socket.send(encoded_len + msg) + except OSError as e: + logger.debug(f"failed to send event: {e}") def install(self): sys.meta_path.insert(0, self) # type: ignore diff --git a/test/telemetry_test.py b/test/telemetry_test.py index ee4eeb10f..fa6b0351a 100644 --- a/test/telemetry_test.py +++ b/test/telemetry_test.py @@ -102,9 +102,9 @@ def test_import_tracing(monkeypatch): from .telemetry import tracing_module_1 # noqa expected_messages: list[typing.Dict[str, typing.Any]] = [ - {"event": "module_load_start", "attributes": {"name": "test.telemetry.tracing_module_1"}}, {"event": "module_load_start", "attributes": {"name": "test.telemetry.tracing_module_2"}}, {"event": "module_load_end", "attributes": {"name": "test.telemetry.tracing_module_2"}}, + {"event": "module_load_start", "attributes": {"name": "test.telemetry.tracing_module_1"}}, {"event": "module_load_end", "attributes": {"name": "test.telemetry.tracing_module_1"}}, ] From 170372f08f266c65a1122e7ef53f2e1697c6c87d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Fors?= Date: Mon, 28 Oct 2024 14:45:46 +0100 Subject: [PATCH 4/4] Fix race in telemetry test --- test/telemetry_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/telemetry_test.py b/test/telemetry_test.py index fa6b0351a..e754002d0 100644 --- a/test/telemetry_test.py +++ b/test/telemetry_test.py @@ -62,9 +62,9 @@ def _listen(self): while not self.stopped: try: conn, _ = self.server.accept() + self.connections.add(conn) receiver = threading.Thread(target=self._recv, args=(conn,), daemon=True) receiver.start() - self.connections.add(conn) except OSError as e: logging.debug(f"listener got exception, exiting: {e}") return