Skip to content

Commit

Permalink
Merge branch 'main' into cathy/flexible-ci-adv
Browse files Browse the repository at this point in the history
  • Loading branch information
gongy authored Sep 16, 2024
2 parents 6467099 + 3648fae commit 8ff2435
Show file tree
Hide file tree
Showing 50 changed files with 2,258 additions and 341 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ jobs:
run: |
python -m venv venv
source venv/bin/activate
pip install grpcio-tools==1.59.2
pip install grpcio-tools==1.59.2 grpclib==0.4.7
python -m grpc_tools.protoc --python_out=. --grpclib_python_out=. --grpc_python_out=. -I . modal_proto/api.proto modal_proto/options.proto
python -m grpc_tools.protoc --plugin=protoc-gen-modal-grpclib-python=protoc_plugin/plugin.py --modal-grpclib-python_out=. -I . modal_proto/api.proto modal_proto/options.proto
deactivate
- name: Check entrypoint import
Expand Down
69 changes: 65 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,69 @@ We appreciate your patience while we speedily work towards a stable release of t

<!-- NEW CONTENT GENERATED BELOW. PLEASE PRESERVE THIS COMMENT. -->

### 0.64.112 (2024-09-15)

- Creating sandboxes without an associated `App` is deprecated. If you are spawning a `Sandbox` outside a Modal container, you can lookup an `App` by name to attach to the `Sandbox`:

```python
app = modal.App.lookup('my-app', create_if_missing=True)
modal.Sandbox.create('echo', 'hi', app=app)
```



### 0.64.109 (2024-09-13)

App handles can now be looked up by name with `modal.App.lookup(name)`. This can be useful for associating sandboxes with apps:

```python
app = modal.App.lookup("my-app", create_if_missing=True)
modal.Sandbox.create("echo", "hi", app=app)
```



### 0.64.100 (2024-09-11)

* Default timeout for `modal.Image.run_function` is now 1 hour. Previously it was 24 hours.



### 0.64.99 (2024-09-11)

* Fixes an issue that could cause containers using `enable_memory_snapshot=True` on Python 3.9 and below to shut down prematurely




### 0.64.87 (2024-09-05)

Sandboxes now support port tunneling. Ports can be exposed via the `open_ports` argument, and a list of active tunnels can be retrieved via the `.tunnels()` method.



### 0.64.67 (2024-08-30)

- Fix a regression in `modal launch` behavior not showing progress output when starting the container.


### 0.64.48 (2024-08-21)

- Introduces new dataclass-style syntax for class parameterization (see updated [docs](https://modal.com/docs/guide/parameterized-functions))

```python
@app.cls()
class MyCls:
param_a: str = modal.parameter()

MyCls(param_a="hello") # synthesized constructor
```

- The new syntax enforces types (`str` or `int` for now) on all parameters

- *When the new syntax is used*, any web endpoints (`web_endpoint`, `asgi_app`, `wsgi_app` or `web_server`) on the app will now also support parameterization through the use of query parameters matching the parameter names, e.g. `https://myfunc.modal.run/?param_a="hello` in the above example.
- The old explicit `__init__` constructor syntax is still allowed, but could be deprecated in the future and doesn't work with web endpoint parameterization

### 0.64.38 (2024-08-16)

Expand All @@ -33,6 +91,9 @@ We appreciate your patience while we speedily work towards a stable release of t
Accordingly, the explicit `--name` option has been deprecated. Providing a name that can be confused with an App ID will also now raise an error.


### 0.64.32 (2024-08-16)

- Updated type stubs using generics to allow static type inferrence for functions calls, e.g. `function.remote(...)`.

### 0.64.26 (2024-08-15)

Expand All @@ -44,7 +105,7 @@ We appreciate your patience while we speedily work towards a stable release of t

- Added support for dynamic batching. Functions or class methods decorated with `@modal.batched` will now automatically batch their invocations together, up to a specified `max_batch_size`. The batch will wait for a maximum of `wait_ms` for more invocations after the first invocation is made. See guide for more details.

```
```python
@app.function()
@modal.batched(max_batch_size=4, wait_ms=1000)
async def batched_multiply(xs: list[int], ys: list[int]) -> list[int]:
Expand All @@ -58,7 +119,7 @@ We appreciate your patience while we speedily work towards a stable release of t
```

The batched function is called with individual inputs:
```
```python
await batched_multiply.remote.aio(2, 3)
```

Expand Down Expand Up @@ -422,10 +483,10 @@ We appreciate your patience while we speedily work towards a stable release of t

- Added `Dict.delete` and `Queue.delete` as API methods for deleting named storage objects:

```
```python
import modal
modal.Queue.delete("my-job-queue")
```
```
- Deprecated invoking `Volume.delete` as an instance method; it should now be invoked as a static method with the name of the Volume to delete, as with the other methods.


Expand Down
10 changes: 9 additions & 1 deletion modal/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ def main():
if config.get("traceback"):
raise

assert exc.__cause__ # We should always raise this class from another error
tb = reduce_traceback_to_user_code(exc.__cause__.__traceback__, exc.user_source)
sys.excepthook(type(exc.__cause__), exc.__cause__, tb)
sys.exit(1)

except Exception as exc:
if config.get("traceback"):
if (
# User has asked to alway see full tracebacks
config.get("traceback")
# The exception message is empty, so we need to provide _some_ actionable information
or not str(exc)
):
raise

from grpclib import GRPCError, Status
Expand Down Expand Up @@ -65,6 +71,8 @@ def main():
else:
title = "Error"
content = str(exc)
if notes := getattr(exc, "__notes__", []):
content = f"{content}\n\nNote: {' ' .join(notes)}"

console = Console(stderr=True)
panel = Panel(Text(content), title=title, title_align="left", border_style="red")
Expand Down
161 changes: 126 additions & 35 deletions modal/_asgi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright Modal Labs 2022
import asyncio
from typing import Any, AsyncGenerator, Callable, Dict, NoReturn, Optional, cast
from typing import Any, AsyncGenerator, Callable, Dict, NoReturn, Optional, Tuple, cast

import aiohttp

Expand All @@ -15,12 +15,82 @@
FIRST_MESSAGE_TIMEOUT_SECONDS = 5.0


def asgi_app_wrapper(asgi_app, function_io_manager) -> Callable[..., AsyncGenerator]:
class LifespanManager:
startup: asyncio.Future
shutdown: asyncio.Future
queue: asyncio.Queue
has_run_init: bool = False

def __init__(self, asgi_app, state):
self.asgi_app = asgi_app
self.state = state

async def ensure_init(self):
# making this async even though
# no async code since it has to run inside
# the event loop to tie the
# objects to the correct loop in python 3.9
if not self.has_run_init:
self.queue = asyncio.Queue()
self.startup = asyncio.Future()
self.shutdown = asyncio.Future()
self.has_run_init = True

async def background_task(self):
await self.ensure_init()

async def receive():
return await self.queue.get()

async def send(message):
if message["type"] == "lifespan.startup.complete":
self.startup.set_result(None)
elif message["type"] == "lifespan.startup.failed":
self.startup.set_exception(ExecutionError("ASGI lifespan startup failed"))
elif message["type"] == "lifespan.shutdown.complete":
self.shutdown.set_result(None)
elif message["type"] == "lifespan.shutdown.failed":
self.shutdown.set_exception(ExecutionError("ASGI lifespan shutdown failed"))
else:
raise ExecutionError(f"Unexpected message type: {message['type']}")

try:
await self.asgi_app({"type": "lifespan", "state": self.state}, receive, send)
except Exception as e:
logger.error(f"Error in ASGI lifespan task: {e}")
if not self.startup.done():
self.startup.set_exception(ExecutionError("ASGI lifespan task exited startup"))
if not self.shutdown.done():
self.shutdown.set_exception(ExecutionError("ASGI lifespan task exited shutdown"))
else:
if not self.startup.done():
self.startup.set_result("ASGI Lifespan protocol is probably not supported by this library")
if not self.shutdown.done():
self.shutdown.set_result("ASGI Lifespan protocol is probably not supported by this library")

async def lifespan_startup(self):
await self.ensure_init()
self.queue.put_nowait({"type": "lifespan.startup"})
await self.startup

async def lifespan_shutdown(self):
await self.ensure_init()
self.queue.put_nowait({"type": "lifespan.shutdown"})
await self.shutdown


def asgi_app_wrapper(asgi_app, container_io_manager) -> Tuple[Callable[..., AsyncGenerator], LifespanManager]:
state: Dict[str, Any] = {} # used for lifespan state

async def fn(scope):
if "state" in scope:
# we don't expect users to set state in ASGI scope
# this should be handled internally by the LifespanManager
raise ExecutionError("Unpexected state in ASGI scope")
scope["state"] = state
function_call_id = current_function_call_id()
assert function_call_id, "internal error: function_call_id not set in asgi_app() scope"

# TODO: Add support for the ASGI lifecycle spec.
messages_from_app: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(1)
messages_to_app: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(1)

Expand Down Expand Up @@ -55,10 +125,21 @@ async def fetch_data_in():
# This initial message, "http.request" or "websocket.connect", should be sent
# immediately after starting the ASGI app's function call. If it is not received, that
# indicates a request cancellation or other abnormal circumstance.
message_gen = function_io_manager.get_data_in.aio(function_call_id)
message_gen = container_io_manager.get_data_in.aio(function_call_id)
first_message_task = asyncio.create_task(message_gen.__anext__())

try:
first_message = await asyncio.wait_for(message_gen.__anext__(), FIRST_MESSAGE_TIMEOUT_SECONDS)
# we are intentionally shielding + manually cancelling first_message_task, since cancellations
# can otherwise get ignored in case the cancellation and an awaited future resolve gets
# triggered in the same sequence before handing back control to the event loop.
first_message = await asyncio.shield(
asyncio.wait_for(first_message_task, FIRST_MESSAGE_TIMEOUT_SECONDS)
)
except asyncio.CancelledError:
if not first_message_task.done():
# see comment above about manual cancellation
first_message_task.cancel()
raise
except (asyncio.TimeoutError, StopAsyncIteration):
# About `StopAsyncIteration` above: The generator shouldn't typically exit,
# but if it does, we handle it like a timeout in that case.
Expand Down Expand Up @@ -128,14 +209,14 @@ async def receive():
app_task.result() # consume/raise exceptions if there are any!
break

return fn
return fn, LifespanManager(asgi_app, state)


def wsgi_app_wrapper(wsgi_app, function_io_manager):
def wsgi_app_wrapper(wsgi_app, container_io_manager):
from ._vendor.a2wsgi_wsgi import WSGIMiddleware

asgi_app = WSGIMiddleware(wsgi_app, workers=10000, send_queue_size=1) # unlimited workers
return asgi_app_wrapper(asgi_app, function_io_manager)
return asgi_app_wrapper(asgi_app, container_io_manager)


def webhook_asgi_app(fn: Callable[..., Any], method: str, docs: bool):
Expand Down Expand Up @@ -316,45 +397,55 @@ async def upstream_to_client():
await asyncio.wait([client_to_upstream_task, upstream_to_client_task], return_when=asyncio.FIRST_COMPLETED)


async def _proxy_lifespan_request(base_url, scope, receive, send) -> None:
session: Optional[aiohttp.ClientSession] = None
while True:
message = await receive()
if message["type"] == "lifespan.startup":
if session is None:
session = aiohttp.ClientSession(
base_url,
cookie_jar=aiohttp.DummyCookieJar(),
timeout=aiohttp.ClientTimeout(total=3600),
auto_decompress=False,
read_bufsize=1024 * 1024, # 1 MiB
**(
# These options were introduced in aiohttp 3.9, and we can remove the
# conditional after deprecating image builder version 2023.12.
dict( # type: ignore
max_line_size=64 * 1024, # 64 KiB
max_field_size=64 * 1024, # 64 KiB
)
if parse_major_minor_version(aiohttp.__version__) >= (3, 9)
else {}
),
)
scope["state"]["session"] = session
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
if session is not None:
await session.close()
await send({"type": "lifespan.shutdown.complete"})
break
else:
raise ExecutionError(f"Unexpected message type: {message['type']}")


def web_server_proxy(host: str, port: int):
"""Return an ASGI app that proxies requests to a web server running on the same host."""
if not 0 < port < 65536:
raise InvalidError(f"Invalid port number: {port}")

base_url = f"http://{host}:{port}"
session: Optional[aiohttp.ClientSession] = None

async def web_server_proxy_app(scope, receive, send):
nonlocal session
if session is None:
# TODO: We currently create the ClientSession on container startup and never close it.
# This outputs an "Unclosed client session" warning during runner termination. We should
# properly close the session once we implement the ASGI lifespan protocol.
session = aiohttp.ClientSession(
base_url,
cookie_jar=aiohttp.DummyCookieJar(),
timeout=aiohttp.ClientTimeout(total=3600),
auto_decompress=False,
read_bufsize=1024 * 1024, # 1 MiB
**(
# These options were introduced in aiohttp 3.9, and we can remove the
# conditional after deprecating image builder version 2023.12.
dict( # type: ignore
max_line_size=64 * 1024, # 64 KiB
max_field_size=64 * 1024, # 64 KiB
)
if parse_major_minor_version(aiohttp.__version__) >= (3, 9)
else {}
),
)

try:
if scope["type"] == "lifespan":
pass # Do nothing for lifespan events.
await _proxy_lifespan_request(base_url, scope, receive, send)
elif scope["type"] == "http":
await _proxy_http_request(session, scope, receive, send)
await _proxy_http_request(scope["state"]["session"], scope, receive, send)
elif scope["type"] == "websocket":
await _proxy_websocket_request(session, scope, receive, send)
await _proxy_websocket_request(scope["state"]["session"], scope, receive, send)
else:
raise NotImplementedError(f"Scope {scope} is not understood")

Expand Down
Loading

0 comments on commit 8ff2435

Please sign in to comment.