Skip to content

Commit

Permalink
Merge branch 'main' into dlm/dont-limit-build-logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dleviminzi committed Mar 4, 2025
2 parents c134a74 + 78fe5b6 commit 1bc1bb0
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 42 deletions.
23 changes: 23 additions & 0 deletions pkg/abstractions/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/beam-cloud/beta9/pkg/types"
)

const (
flushLogsTimeout = 500 * time.Millisecond
)

type LogStreamOpts struct {
SendCallback func(o common.OutputMsg) error
ExitCallback func(exitCode int32) error
Expand Down Expand Up @@ -97,6 +101,25 @@ _stream:
exitCode = -1
}

// After the container exits, flush remaining logs for up to flushLogsTimeout milliseconds
flushLogsTimer := time.NewTimer(flushLogsTimeout)
defer flushLogsTimer.Stop()

_flush:
for {
select {
case o, ok := <-outputChan:
if !ok {
break _flush
}
if err := l.sendCallback(o); err != nil {
break _flush
}
case <-flushLogsTimer.C:
break _flush
}
}

if err := l.exitCallback(int32(exitCode)); err != nil {
break _stream
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/abstractions/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
pipCommandType string = "pip"
shellCommandType string = "shell"
micromambaCommandType string = "micromamba"

dockerHubRegistry string = "docker.io"
)

type Builder struct {
Expand Down Expand Up @@ -244,6 +246,10 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
containerSpinupTimeout = b.calculateImageArchiveDuration(ctx, opts)
}

if creds, err := b.shouldUseDefaultDockerCreds(opts); err == nil {
opts.BaseImageCreds = creds
}

containerId := b.genContainerId()

containerRequest, err := b.generateContainerRequest(opts, dockerfile, containerId, authInfo.Workspace)
Expand Down Expand Up @@ -440,6 +446,20 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
return nil
}

func (b *Builder) shouldUseDefaultDockerCreds(opts *BuildOpts) (string, error) {
isDockerHub := opts.BaseImageRegistry == dockerHubRegistry
credsNotSet := opts.BaseImageCreds == ""

if isDockerHub && credsNotSet {
username := b.config.ImageService.Registries.Docker.Username
password := b.config.ImageService.Registries.Docker.Password
if username != "" && password != "" {
return fmt.Sprintf("%s:%s", username, password), nil
}
}
return "", errors.New("docker creds not set in config")
}

// generateContainerRequest generates a container request for the build container
func (b *Builder) generateContainerRequest(opts *BuildOpts, dockerfile *string, containerId string, workspace *types.Workspace) (*types.ContainerRequest, error) {
baseImageId, err := b.GetImageId(&BuildOpts{
Expand Down Expand Up @@ -569,7 +589,7 @@ func ExtractImageNameAndTag(imageRef string) (BaseImage, error) {

registry := result["Registry"]
if registry == "" {
registry = "docker.io"
registry = dockerHubRegistry
}

repo := result["Repo"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ imageService:
pythonVersion: python3.10
registries:
docker:
username: beamcloud
username:
password:
s3:
bucketName: beta9-images
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type AppConfig struct {
Database DatabaseConfig `key:"database" json:"database"`
GatewayService GatewayServiceConfig `key:"gateway" json:"gateway_service"`
FileService FileServiceConfig `key:"fileService" json:"file_service"`
ImageService ImageServiceConfig `key:"imageservice" json:"image_service"`
ImageService ImageServiceConfig `key:"imageService" json:"image_service"`
Storage StorageConfig `key:"storage" json:"storage"`
Worker WorkerConfig `key:"worker" json:"worker"`
Providers ProviderConfig `key:"providers" json:"providers"`
Expand Down
2 changes: 1 addition & 1 deletion sdk/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "beta9"
version = "0.1.179"
version = "0.1.180"
description = ""
authors = ["beam.cloud <[email protected]>"]
packages = [
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/beta9/abstractions/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
RunnerAbstraction,
)
from ..abstractions.image import Image
from ..abstractions.volume import Volume
from ..abstractions.volume import CloudBucket, Volume
from ..channel import with_grpc_error_handling
from ..clients.container import (
CommandExecutionRequest,
Expand Down Expand Up @@ -38,8 +38,8 @@ class Container(RunnerAbstraction):
specified but this value is set to 0, it will be automatically updated to 1.
image (Union[Image, dict]):
The container image used for the task execution. Default is [Image](#image).
volumes (Optional[List[Volume]]):
A list of volumes to be mounted to the container. Default is None.
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of volumes and/or cloud buckets to be mounted to the container. Default is None.
secrets (Optional[List[str]):
A list of secrets that are injected into the container as environment variables. Default is [].
name (Optional[str]):
Expand Down Expand Up @@ -67,7 +67,7 @@ def __init__(
gpu: Union[GpuTypeAlias, List[GpuTypeAlias]] = GpuType.NoGPU,
gpu_count: int = 0,
image: Image = Image(),
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
callback_url: Optional[str] = None,
on_deploy: Optional[AbstractCallableWrapper] = None,
Expand Down
20 changes: 10 additions & 10 deletions sdk/src/beta9/abstractions/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
RunnerAbstraction,
)
from ..abstractions.image import Image
from ..abstractions.volume import Volume
from ..abstractions.volume import CloudBucket, Volume
from ..channel import with_grpc_error_handling
from ..clients.endpoint import (
EndpointServeKeepAliveRequest,
Expand Down Expand Up @@ -52,8 +52,8 @@ class Endpoint(RunnerAbstraction):
specified but this value is set to 0, it will be automatically updated to 1.
image (Union[Image, dict]):
The container image used for the task execution. Default is [Image](#image).
volumes (Optional[List[Volume]]):
A list of volumes to be mounted to the endpoint. Default is None.
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of volumes and/or cloud buckets to be mounted to the endpoint. Default is None.
timeout (Optional[int]):
The maximum number of seconds a task can run before it times out.
Default is 3600. Set it to -1 to disable the timeout.
Expand Down Expand Up @@ -127,7 +127,7 @@ def __init__(
max_pending_tasks: int = 100,
on_start: Optional[Callable] = None,
on_deploy: Optional[AbstractCallableWrapper] = None,
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = {},
name: Optional[str] = None,
Expand Down Expand Up @@ -192,8 +192,8 @@ class ASGI(Endpoint):
specified but this value is set to 0, it will be automatically updated to 1.
image (Union[Image, dict]):
The container image used for the task execution. Default is [Image](#image).
volumes (Optional[List[Volume]]):
A list of volumes to be mounted to the ASGI application. Default is None.
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of volumes and/or cloud buckets to be mounted to the ASGI application. Default is None.
timeout (Optional[int]):
The maximum number of seconds a task can run before it times out.
Default is 3600. Set it to -1 to disable the timeout.
Expand Down Expand Up @@ -283,7 +283,7 @@ def __init__(
max_pending_tasks: int = 100,
on_start: Optional[Callable] = None,
on_deploy: Optional[AbstractCallableWrapper] = None,
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = {},
name: Optional[str] = None,
Expand Down Expand Up @@ -341,8 +341,8 @@ class RealtimeASGI(ASGI):
specified but this value is set to 0, it will be automatically updated to 1.
image (Union[Image, dict]):
The container image used for the task execution. Default is [Image](#image).
volumes (Optional[List[Volume]]):
A list of volumes to be mounted to the ASGI application. Default is None.
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of volumes and/or cloud buckets to be mounted to the ASGI application. Default is None.
timeout (Optional[int]):
The maximum number of seconds a task can run before it times out.
Default is 3600. Set it to -1 to disable the timeout.
Expand Down Expand Up @@ -418,7 +418,7 @@ def __init__(
max_pending_tasks: int = 100,
on_start: Optional[Callable] = None,
on_deploy: Optional[AbstractCallableWrapper] = None,
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = {},
name: Optional[str] = None,
Expand Down
16 changes: 8 additions & 8 deletions sdk/src/beta9/abstractions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
RunnerAbstraction,
)
from ..abstractions.image import Image
from ..abstractions.volume import Volume
from ..abstractions.volume import CloudBucket, Volume
from ..channel import with_grpc_error_handling
from ..clients.function import (
FunctionInvokeRequest,
Expand Down Expand Up @@ -56,8 +56,8 @@ class Function(RunnerAbstraction):
The maximum number of times a task will be retried if the container crashes. Default is 3.
callback_url (Optional[str]):
An optional URL to send a callback to when a task is completed, timed out, or cancelled.
volumes (Optional[List[Volume]]):
A list of storage volumes to be associated with the function. Default is [].
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of storage volumes and/or cloud buckets to be associated with the function. Default is [].
secrets (Optional[List[str]):
A list of secrets that are injected into the container as environment variables. Default is [].
env (Optional[Dict[str, str]]):
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
timeout: int = 3600,
retries: int = 3,
callback_url: Optional[str] = None,
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = {},
name: Optional[str] = None,
Expand Down Expand Up @@ -284,7 +284,7 @@ def invocation_details(self, **kwargs) -> None:
next_run_utc = cron_utc.get_next(datetime)
terminal.print(
(
f" [bright_white]{i+1}.[/bright_white] {next_run_utc:%Y-%m-%d %H:%M:%S %Z} "
f" [bright_white]{i + 1}.[/bright_white] {next_run_utc:%Y-%m-%d %H:%M:%S %Z} "
f"({next_run:%Y-%m-%d %H:%M:%S} {local_tz})"
)
)
Expand Down Expand Up @@ -318,8 +318,8 @@ class Schedule(Function):
The maximum number of times a task will be retried if the container crashes. Default is 3.
callback_url (Optional[str]):
An optional URL to send a callback to when a task is completed, timed out, or cancelled.
volumes (Optional[List[Volume]]):
A list of storage volumes to be associated with the function. Default is [].
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of storage volumes and/or cloud buckets to be associated with the function. Default is [].
secrets (Optional[List[str]):
A list of secrets that are injected into the container as environment variables. Default is [].
env (Optional[Dict[str, str]]):
Expand Down Expand Up @@ -359,7 +359,7 @@ def __init__(
timeout: int = 3600,
retries: int = 3,
callback_url: Optional[str] = None,
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = {},
name: Optional[str] = None,
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/beta9/abstractions/integrations/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ...abstractions.base.runner import ASGI_DEPLOYMENT_STUB_TYPE, ASGI_SERVE_STUB_TYPE
from ...abstractions.endpoint import ASGI
from ...abstractions.image import Image
from ...abstractions.volume import Volume
from ...abstractions.volume import CloudBucket, Volume
from ...channel import with_grpc_error_handling
from ...clients.endpoint import (
EndpointServeKeepAliveRequest,
Expand Down Expand Up @@ -172,8 +172,8 @@ class VLLM(ASGI):
Whether the endpoints require authorization. Default is True.
name (str):
The name of the container. Default is none, which means you must provide it during deployment.
volumes (List[Volume]):
The volumes to mount into the container. Default is a single volume named "vllm_cache" mounted to "./vllm_cache".
volumes (List[Union[Volume, CloudBucket]]):
The volumes and/or cloud buckets to mount into the container. Default is a single volume named "vllm_cache" mounted to "./vllm_cache".
It is used as the download directory for vLLM models.
secrets (List[str]):
The secrets to pass to the container. If you need huggingface authentication to download models, you should set HF_TOKEN in the secrets.
Expand Down Expand Up @@ -208,7 +208,7 @@ def __init__(
timeout: int = 3600,
authorized: bool = True,
name: Optional[str] = None,
volumes: Optional[List[Volume]] = [],
volumes: Optional[List[Union[Volume, CloudBucket]]] = [],
secrets: Optional[List[str]] = None,
autoscaler: Autoscaler = QueueDepthAutoscaler(),
vllm_args: VLLMArgs = VLLMArgs(),
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/beta9/abstractions/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
RunnerAbstraction,
)
from ..abstractions.image import Image
from ..abstractions.volume import Volume
from ..abstractions.volume import CloudBucket, Volume
from ..channel import with_grpc_error_handling
from ..clients.gateway import (
DeployStubRequest,
Expand Down Expand Up @@ -93,8 +93,8 @@ class Pod(RunnerAbstraction):
specified but this value is set to 0, it will be automatically updated to 1.
image (Union[Image, dict]):
The container image used for the task execution. Default is [Image](#image).
volumes (Optional[List[Volume]]):
A list of volumes to be mounted to the pod. Default is None.
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of volumes and/or cloud buckets to be mounted to the pod. Default is None.
secrets (Optional[List[str]):
A list of secrets that are injected into the pod as environment variables. Default is [].
env (Optional[Dict[str, str]]):
Expand Down Expand Up @@ -129,7 +129,7 @@ def __init__(
gpu: Union[GpuTypeAlias, List[GpuTypeAlias]] = GpuType.NoGPU,
gpu_count: int = 0,
image: Image = Image(),
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = {},
keep_warm_seconds: int = 600,
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/beta9/abstractions/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
RunnerAbstraction,
)
from ..abstractions.image import Image
from ..abstractions.volume import Volume
from ..abstractions.volume import CloudBucket, Volume
from ..channel import with_grpc_error_handling
from ..clients.taskqueue import (
StartTaskQueueServeRequest,
Expand Down Expand Up @@ -73,8 +73,8 @@ class TaskQueue(RunnerAbstraction):
loading models, or anything else computationally expensive.
callback_url (Optional[str]):
An optional URL to send a callback to when a task is completed, timed out, or cancelled.
volumes (Optional[List[Volume]]):
A list of storage volumes to be associated with the taskqueue. Default is [].
volumes (Optional[List[Union[Volume, CloudBucket]]]):
A list of storage volumes and/or cloud buckets to be associated with the taskqueue. Default is [].
secrets (Optional[List[str]):
A list of secrets that are injected into the container as environment variables. Default is [].
env (Optional[Dict[str, str]]):
Expand Down Expand Up @@ -127,7 +127,7 @@ def __init__(
on_start: Optional[Callable] = None,
on_deploy: Optional[AbstractCallableWrapper] = None,
callback_url: Optional[str] = None,
volumes: Optional[List[Volume]] = None,
volumes: Optional[List[Union[Volume, CloudBucket]]] = None,
secrets: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = {},
name: Optional[str] = None,
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/beta9/abstractions/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class CloudBucketConfig:
read_only (bool):
Whether the volume is read-only.
access_key (str):
The S3 access key for the external provider.
The name of the beam secret containing the S3 access key for the external provider.
secret_key (str):
The S3 secret key for the external provider.
The name of the beam secret containing the S3 secret key for the external provider.
endpoint (Optional[str]):
The S3 endpoint for the external provider.
region (Optional[str]):
Expand Down Expand Up @@ -125,8 +125,8 @@ def __init__(self, name: str, mount_path: str, config: CloudBucketConfig) -> Non
name="other_model_weights",
mount_path="./other-weights",
config=CloudBucketConfig(
access_key="my-access-key",
secret_key="my-secret-key",
access_key="MY_ACCESS_KEY_SECRET",
secret_key="MY_SECRET_KEY_SECRET",
endpoint="https://s3-endpoint.com",
),
)
Expand Down

0 comments on commit 1bc1bb0

Please sign in to comment.