Skip to content

Commit

Permalink
Merge branch 'main' into cathy/flexible-ci-adv
Browse files Browse the repository at this point in the history
  • Loading branch information
cathyzbn authored Aug 30, 2024
2 parents 15e7026 + 194053b commit c5e854f
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 12 deletions.
6 changes: 3 additions & 3 deletions modal/_container_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ class DaemonizedThreadPool:
# Used instead of ThreadPoolExecutor, since the latter won't allow
# the interpreter to shut down before the currently running tasks
# have finished
def __init__(self, container_io_manager: ContainerIOManager):
self.container_io_manager = container_io_manager
def __init__(self, max_threads: int):
self.max_threads = max_threads

def __enter__(self):
self.spawned_workers = 0
Expand Down Expand Up @@ -416,7 +416,7 @@ def run_input_sync(io_context: IOContext) -> None:
reset_context()

if container_io_manager.target_concurrency > 1:
with DaemonizedThreadPool(container_io_manager) as thread_pool:
with DaemonizedThreadPool(max_threads=container_io_manager.max_concurrency) as thread_pool:

def make_async_cancel_callback(task):
def f():
Expand Down
13 changes: 7 additions & 6 deletions modal/_container_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ def _init(self, container_args: api_pb2.ContainerArguments, client: _Client):
target_concurrency = 1
max_concurrency = 0
else:
target_concurrency = container_args.function_def.target_concurrent_inputs or 1
max_concurrency = container_args.function_def.max_concurrent_inputs or 0
target_concurrency = container_args.function_def.allow_concurrent_inputs or 1
max_concurrency = container_args.function_def.max_concurrent_inputs or target_concurrency

self._target_concurrency = target_concurrency
self._max_concurrency = max_concurrency
Expand Down Expand Up @@ -405,7 +405,6 @@ async def dynamic_concurrency_manager(self) -> AsyncGenerator[None, None]:
async def _dynamic_concurrency_loop(self):
logger.debug(f"Starting dynamic concurrency loop for task {self.task_id}")
while self._allow_dynamic_concurrency:
t0 = time.monotonic()
try:
request = api_pb2.FunctionGetDynamicConcurrencyRequest(
function_id=self.function_id,
Expand All @@ -424,9 +423,7 @@ async def _dynamic_concurrency_loop(self):
except Exception as exc:
logger.debug(f"Failed to get dynamic concurrency for task {self.task_id}, {exc}")

duration = time.monotonic() - t0
time_until_next = max(0.0, DYNAMIC_CONCURRENCY_INTERVAL_SECS - duration)
await asyncio.sleep(time_until_next)
await asyncio.sleep(DYNAMIC_CONCURRENCY_INTERVAL_SECS)

async def get_app_objects(self) -> RunningApp:
req = api_pb2.AppGetObjectsRequest(app_id=self.app_id, include_unindexed=True)
Expand Down Expand Up @@ -942,6 +939,10 @@ async def interact(self, from_breakpoint: bool = False):
def target_concurrency(self) -> int:
return self._target_concurrency

@property
def max_concurrency(self) -> int:
return self._max_concurrency

@classmethod
def get_input_concurrency(cls) -> int:
io_manager = cls._singleton
Expand Down
39 changes: 38 additions & 1 deletion modal_proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,11 @@ message ContainerHeartbeatResponse {
optional CancelInputEvent cancel_input_event = 1;
}

message ContainerLogRequest {
string message = 1;
string input_id = 2;
}

message ContainerStopRequest {
string task_id = 1 [ (modal.options.audit_target_attr) = true ];
}
Expand Down Expand Up @@ -1564,6 +1569,15 @@ message PTYInfo {
PTYType pty_type = 7;
}

message PortSpec {
uint32 port = 1;
bool unencrypted = 2;
}

message PortSpecs {
repeated PortSpec ports = 1;
}

message ProxyGetOrCreateRequest {
string deployment_name = 1 [ (modal.options.audit_target_attr) = true ];
DeploymentNamespace namespace = 2;
Expand Down Expand Up @@ -1751,6 +1765,9 @@ message Sandbox {
repeated Resources _experimental_resources = 18; // overrides `resources` field above

string worker_id = 19; // for internal debugging use only
oneof open_ports_oneof {
PortSpecs open_ports = 20;
}
}

message SandboxCreateRequest {
Expand All @@ -1777,6 +1794,16 @@ message SandboxGetTaskIdResponse {
string task_id = 1;
}

message SandboxGetTunnelsRequest {
bool poll = 1;
string sandbox_id = 2;
}

message SandboxGetTunnelsResponse {
GenericResult result = 1;
repeated TunnelData tunnels = 2;
}

message SandboxHandleMetadata {
GenericResult result = 1;
}
Expand Down Expand Up @@ -1848,11 +1875,12 @@ message SchedulerPlacement {
// - Fold in cloud, resource needs here too.
// - Allow specifying list of zones, cloud, fallback and alternative
// GPU types.
optional string _region = 1 [deprecated=true];

repeated string regions = 4;
optional string _zone = 2;
optional string _lifecycle = 3; // "on-demand" or "spot", else ignored

reserved 1;
}

message SecretCreateRequest { // Not used by client anymore
Expand Down Expand Up @@ -2071,6 +2099,13 @@ message TokenFlowWaitResponse {
string workspace_username = 4;
}

message TunnelData {
string host = 1;
uint32 port = 2;
optional string unencrypted_host = 3;
optional uint32 unencrypted_port = 4;
}

message TunnelStartRequest {
uint32 port = 1;
bool unencrypted = 2;
Expand Down Expand Up @@ -2273,6 +2308,7 @@ service ModalClient {
rpc ContainerExecPutInput(ContainerExecPutInputRequest) returns (google.protobuf.Empty);
rpc ContainerExecWait(ContainerExecWaitRequest) returns (ContainerExecWaitResponse);
rpc ContainerHeartbeat(ContainerHeartbeatRequest) returns (ContainerHeartbeatResponse);
rpc ContainerLog(ContainerLogRequest) returns (google.protobuf.Empty);
rpc ContainerStop(ContainerStopRequest) returns (ContainerStopResponse);

// Dicts
Expand Down Expand Up @@ -2350,6 +2386,7 @@ service ModalClient {
rpc SandboxCreate(SandboxCreateRequest) returns (SandboxCreateResponse);
rpc SandboxGetLogs(SandboxGetLogsRequest) returns (stream TaskLogsBatch);
rpc SandboxGetTaskId(SandboxGetTaskIdRequest) returns (SandboxGetTaskIdResponse); // needed for modal container exec
rpc SandboxGetTunnels(SandboxGetTunnelsRequest) returns (SandboxGetTunnelsResponse);
rpc SandboxList(SandboxListRequest) returns (SandboxListResponse);
rpc SandboxStdinWrite(SandboxStdinWriteRequest) returns (SandboxStdinWriteResponse);
rpc SandboxTerminate(SandboxTerminateRequest) returns (SandboxTerminateResponse);
Expand Down
2 changes: 1 addition & 1 deletion modal_version/_version_generated.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright Modal Labs 2024

# Note: Reset this value to -1 whenever you make a minor `0.X` release of the client.
build_number = 59 # git: 26ee5ab
build_number = 64 # git: 0440ad6
1 change: 0 additions & 1 deletion test/container_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2066,7 +2066,6 @@ def test_max_concurrency(servicer):
outputs = [deserialize(item.result.data, ret.client) for item in ret.items]
assert n_inputs in outputs


@skip_github_non_linux
def test_set_local_concurrent_inputs(servicer):
n_inputs = 5
Expand Down

0 comments on commit c5e854f

Please sign in to comment.