Skip to content

Commit

Permalink
Rename confusingly overlapping variable name
Browse files Browse the repository at this point in the history
  • Loading branch information
rohansingh committed Jan 9, 2025
1 parent 4539513 commit 8d25bea
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions modal/_runtime/container_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ async def put_data_out(
function_call_id: str,
start_index: int,
data_format: int,
messages_bytes: list[Any],
serialized_messages: list[Any],
) -> None:
"""Put data onto the `data_out` stream of a function call.
Expand All @@ -511,7 +511,7 @@ async def put_data_out(
still use the previous Postgres-backed system based on `FunctionPutOutputs()`.
"""
data_chunks: list[api_pb2.DataChunk] = []
for i, message_bytes in enumerate(messages_bytes):
for i, message_bytes in enumerate(serialized_messages):
chunk = api_pb2.DataChunk(data_format=data_format, index=start_index + i) # type: ignore
if len(message_bytes) > MAX_OBJECT_SIZE_BYTES:
chunk.data_blob_id = await blob_upload(message_bytes, self._client.stub)
Expand All @@ -534,8 +534,8 @@ async def generator_output_task(self, function_call_id: str, data_format: int, m
# If we don't sleep here for 1ms we end up with an extra call to .put_data_out().
if index == 1:
await asyncio.sleep(0.001)
messages_bytes = [serialize_data_format(message, data_format)]
total_size = len(messages_bytes[0]) + 512
serialized_messages = [serialize_data_format(message, data_format)]
total_size = len(serialized_messages[0]) + 512
while total_size < 16 * 1024 * 1024: # 16 MiB, maximum size in a single message
try:
message = message_rx.get_nowait()
Expand All @@ -545,10 +545,10 @@ async def generator_output_task(self, function_call_id: str, data_format: int, m
received_sentinel = True
break
else:
messages_bytes.append(serialize_data_format(message, data_format))
total_size += len(messages_bytes[-1]) + 512 # 512 bytes for estimated framing overhead
await self.put_data_out(function_call_id, index, data_format, messages_bytes)
index += len(messages_bytes)
serialized_messages.append(serialize_data_format(message, data_format))
total_size += len(serialized_messages[-1]) + 512 # 512 bytes for estimated framing overhead
await self.put_data_out(function_call_id, index, data_format, serialized_messages)
index += len(serialized_messages)

async def _queue_create(self, size: int) -> asyncio.Queue:
"""Create a queue, on the synchronicity event loop (needed on Python 3.8 and 3.9)."""
Expand Down

0 comments on commit 8d25bea

Please sign in to comment.