diff --git a/pkg/abstractions/common/logs.go b/pkg/abstractions/common/logs.go index e691bff6e..2b942481a 100644 --- a/pkg/abstractions/common/logs.go +++ b/pkg/abstractions/common/logs.go @@ -12,6 +12,10 @@ import ( "github.com/beam-cloud/beta9/pkg/types" ) +const ( + flushLogsTimeout = 500 * time.Millisecond +) + type LogStreamOpts struct { SendCallback func(o common.OutputMsg) error ExitCallback func(exitCode int32) error @@ -97,6 +101,25 @@ _stream: exitCode = -1 } + // After the container exits, flush remaining logs for up to flushLogsTimeout milliseconds + flushLogsTimer := time.NewTimer(flushLogsTimeout) + defer flushLogsTimer.Stop() + + _flush: + for { + select { + case o, ok := <-outputChan: + if !ok { + break _flush + } + if err := l.sendCallback(o); err != nil { + break _flush + } + case <-flushLogsTimer.C: + break _flush + } + } + if err := l.exitCallback(int32(exitCode)); err != nil { break _stream } diff --git a/pkg/abstractions/image/build.go b/pkg/abstractions/image/build.go index 309ede0a7..ba4df439c 100644 --- a/pkg/abstractions/image/build.go +++ b/pkg/abstractions/image/build.go @@ -37,6 +37,8 @@ const ( pipCommandType string = "pip" shellCommandType string = "shell" micromambaCommandType string = "micromamba" + + dockerHubRegistry string = "docker.io" ) type Builder struct { @@ -244,6 +246,10 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co containerSpinupTimeout = b.calculateImageArchiveDuration(ctx, opts) } + if creds, err := b.shouldUseDefaultDockerCreds(opts); err == nil { + opts.BaseImageCreds = creds + } + containerId := b.genContainerId() containerRequest, err := b.generateContainerRequest(opts, dockerfile, containerId, authInfo.Workspace) @@ -440,6 +446,20 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co return nil } +func (b *Builder) shouldUseDefaultDockerCreds(opts *BuildOpts) (string, error) { + isDockerHub := opts.BaseImageRegistry == dockerHubRegistry + credsNotSet := opts.BaseImageCreds == "" + + if isDockerHub && credsNotSet { + username := b.config.ImageService.Registries.Docker.Username + password := b.config.ImageService.Registries.Docker.Password + if username != "" && password != "" { + return fmt.Sprintf("%s:%s", username, password), nil + } + } + return "", errors.New("docker creds not set in config") +} + // generateContainerRequest generates a container request for the build container func (b *Builder) generateContainerRequest(opts *BuildOpts, dockerfile *string, containerId string, workspace *types.Workspace) (*types.ContainerRequest, error) { baseImageId, err := b.GetImageId(&BuildOpts{ @@ -569,7 +589,7 @@ func ExtractImageNameAndTag(imageRef string) (BaseImage, error) { registry := result["Registry"] if registry == "" { - registry = "docker.io" + registry = dockerHubRegistry } repo := result["Repo"] diff --git a/pkg/common/config.default.yaml b/pkg/common/config.default.yaml index 0f67d8432..2f3b439f1 100644 --- a/pkg/common/config.default.yaml +++ b/pkg/common/config.default.yaml @@ -75,7 +75,7 @@ imageService: pythonVersion: python3.10 registries: docker: - username: beamcloud + username: password: s3: bucketName: beta9-images diff --git a/pkg/types/config.go b/pkg/types/config.go index 79d80c4c4..2c7ef46df 100644 --- a/pkg/types/config.go +++ b/pkg/types/config.go @@ -17,7 +17,7 @@ type AppConfig struct { Database DatabaseConfig `key:"database" json:"database"` GatewayService GatewayServiceConfig `key:"gateway" json:"gateway_service"` FileService FileServiceConfig `key:"fileService" json:"file_service"` - ImageService ImageServiceConfig `key:"imageservice" json:"image_service"` + ImageService ImageServiceConfig `key:"imageService" json:"image_service"` Storage StorageConfig `key:"storage" json:"storage"` Worker WorkerConfig `key:"worker" json:"worker"` Providers ProviderConfig `key:"providers" json:"providers"` diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index 00d7685ca..0606e8590 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "beta9" -version = "0.1.179" +version = "0.1.180" description = "" authors = ["beam.cloud "] packages = [ diff --git a/sdk/src/beta9/abstractions/container.py b/sdk/src/beta9/abstractions/container.py index 43a52e33a..33f320d84 100644 --- a/sdk/src/beta9/abstractions/container.py +++ b/sdk/src/beta9/abstractions/container.py @@ -7,7 +7,7 @@ RunnerAbstraction, ) from ..abstractions.image import Image -from ..abstractions.volume import Volume +from ..abstractions.volume import CloudBucket, Volume from ..channel import with_grpc_error_handling from ..clients.container import ( CommandExecutionRequest, @@ -38,8 +38,8 @@ class Container(RunnerAbstraction): specified but this value is set to 0, it will be automatically updated to 1. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). - volumes (Optional[List[Volume]]): - A list of volumes to be mounted to the container. Default is None. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of volumes and/or cloud buckets to be mounted to the container. Default is None. secrets (Optional[List[str]): A list of secrets that are injected into the container as environment variables. Default is []. name (Optional[str]): @@ -67,7 +67,7 @@ def __init__( gpu: Union[GpuTypeAlias, List[GpuTypeAlias]] = GpuType.NoGPU, gpu_count: int = 0, image: Image = Image(), - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, callback_url: Optional[str] = None, on_deploy: Optional[AbstractCallableWrapper] = None, diff --git a/sdk/src/beta9/abstractions/endpoint.py b/sdk/src/beta9/abstractions/endpoint.py index f4fbc976e..6baf890ff 100644 --- a/sdk/src/beta9/abstractions/endpoint.py +++ b/sdk/src/beta9/abstractions/endpoint.py @@ -18,7 +18,7 @@ RunnerAbstraction, ) from ..abstractions.image import Image -from ..abstractions.volume import Volume +from ..abstractions.volume import CloudBucket, Volume from ..channel import with_grpc_error_handling from ..clients.endpoint import ( EndpointServeKeepAliveRequest, @@ -52,8 +52,8 @@ class Endpoint(RunnerAbstraction): specified but this value is set to 0, it will be automatically updated to 1. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). - volumes (Optional[List[Volume]]): - A list of volumes to be mounted to the endpoint. Default is None. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of volumes and/or cloud buckets to be mounted to the endpoint. Default is None. timeout (Optional[int]): The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. @@ -127,7 +127,7 @@ def __init__( max_pending_tasks: int = 100, on_start: Optional[Callable] = None, on_deploy: Optional[AbstractCallableWrapper] = None, - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, env: Optional[Dict[str, str]] = {}, name: Optional[str] = None, @@ -192,8 +192,8 @@ class ASGI(Endpoint): specified but this value is set to 0, it will be automatically updated to 1. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). - volumes (Optional[List[Volume]]): - A list of volumes to be mounted to the ASGI application. Default is None. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of volumes and/or cloud buckets to be mounted to the ASGI application. Default is None. timeout (Optional[int]): The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. @@ -283,7 +283,7 @@ def __init__( max_pending_tasks: int = 100, on_start: Optional[Callable] = None, on_deploy: Optional[AbstractCallableWrapper] = None, - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, env: Optional[Dict[str, str]] = {}, name: Optional[str] = None, @@ -341,8 +341,8 @@ class RealtimeASGI(ASGI): specified but this value is set to 0, it will be automatically updated to 1. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). - volumes (Optional[List[Volume]]): - A list of volumes to be mounted to the ASGI application. Default is None. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of volumes and/or cloud buckets to be mounted to the ASGI application. Default is None. timeout (Optional[int]): The maximum number of seconds a task can run before it times out. Default is 3600. Set it to -1 to disable the timeout. @@ -418,7 +418,7 @@ def __init__( max_pending_tasks: int = 100, on_start: Optional[Callable] = None, on_deploy: Optional[AbstractCallableWrapper] = None, - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, env: Optional[Dict[str, str]] = {}, name: Optional[str] = None, diff --git a/sdk/src/beta9/abstractions/function.py b/sdk/src/beta9/abstractions/function.py index 8b2bae0a9..74b446e87 100644 --- a/sdk/src/beta9/abstractions/function.py +++ b/sdk/src/beta9/abstractions/function.py @@ -15,7 +15,7 @@ RunnerAbstraction, ) from ..abstractions.image import Image -from ..abstractions.volume import Volume +from ..abstractions.volume import CloudBucket, Volume from ..channel import with_grpc_error_handling from ..clients.function import ( FunctionInvokeRequest, @@ -56,8 +56,8 @@ class Function(RunnerAbstraction): The maximum number of times a task will be retried if the container crashes. Default is 3. callback_url (Optional[str]): An optional URL to send a callback to when a task is completed, timed out, or cancelled. - volumes (Optional[List[Volume]]): - A list of storage volumes to be associated with the function. Default is []. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of storage volumes and/or cloud buckets to be associated with the function. Default is []. secrets (Optional[List[str]): A list of secrets that are injected into the container as environment variables. Default is []. env (Optional[Dict[str, str]]): @@ -99,7 +99,7 @@ def __init__( timeout: int = 3600, retries: int = 3, callback_url: Optional[str] = None, - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, env: Optional[Dict[str, str]] = {}, name: Optional[str] = None, @@ -284,7 +284,7 @@ def invocation_details(self, **kwargs) -> None: next_run_utc = cron_utc.get_next(datetime) terminal.print( ( - f" [bright_white]{i+1}.[/bright_white] {next_run_utc:%Y-%m-%d %H:%M:%S %Z} " + f" [bright_white]{i + 1}.[/bright_white] {next_run_utc:%Y-%m-%d %H:%M:%S %Z} " f"({next_run:%Y-%m-%d %H:%M:%S} {local_tz})" ) ) @@ -318,8 +318,8 @@ class Schedule(Function): The maximum number of times a task will be retried if the container crashes. Default is 3. callback_url (Optional[str]): An optional URL to send a callback to when a task is completed, timed out, or cancelled. - volumes (Optional[List[Volume]]): - A list of storage volumes to be associated with the function. Default is []. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of storage volumes and/or cloud buckets to be associated with the function. Default is []. secrets (Optional[List[str]): A list of secrets that are injected into the container as environment variables. Default is []. env (Optional[Dict[str, str]]): @@ -359,7 +359,7 @@ def __init__( timeout: int = 3600, retries: int = 3, callback_url: Optional[str] = None, - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, env: Optional[Dict[str, str]] = {}, name: Optional[str] = None, diff --git a/sdk/src/beta9/abstractions/integrations/vllm.py b/sdk/src/beta9/abstractions/integrations/vllm.py index 1746a6e0a..a00fb64a0 100644 --- a/sdk/src/beta9/abstractions/integrations/vllm.py +++ b/sdk/src/beta9/abstractions/integrations/vllm.py @@ -8,7 +8,7 @@ from ...abstractions.base.runner import ASGI_DEPLOYMENT_STUB_TYPE, ASGI_SERVE_STUB_TYPE from ...abstractions.endpoint import ASGI from ...abstractions.image import Image -from ...abstractions.volume import Volume +from ...abstractions.volume import CloudBucket, Volume from ...channel import with_grpc_error_handling from ...clients.endpoint import ( EndpointServeKeepAliveRequest, @@ -172,8 +172,8 @@ class VLLM(ASGI): Whether the endpoints require authorization. Default is True. name (str): The name of the container. Default is none, which means you must provide it during deployment. - volumes (List[Volume]): - The volumes to mount into the container. Default is a single volume named "vllm_cache" mounted to "./vllm_cache". + volumes (List[Union[Volume, CloudBucket]]): + The volumes and/or cloud buckets to mount into the container. Default is a single volume named "vllm_cache" mounted to "./vllm_cache". It is used as the download directory for vLLM models. secrets (List[str]): The secrets to pass to the container. If you need huggingface authentication to download models, you should set HF_TOKEN in the secrets. @@ -208,7 +208,7 @@ def __init__( timeout: int = 3600, authorized: bool = True, name: Optional[str] = None, - volumes: Optional[List[Volume]] = [], + volumes: Optional[List[Union[Volume, CloudBucket]]] = [], secrets: Optional[List[str]] = None, autoscaler: Autoscaler = QueueDepthAutoscaler(), vllm_args: VLLMArgs = VLLMArgs(), diff --git a/sdk/src/beta9/abstractions/pod.py b/sdk/src/beta9/abstractions/pod.py index 9cfad82af..6038a694a 100644 --- a/sdk/src/beta9/abstractions/pod.py +++ b/sdk/src/beta9/abstractions/pod.py @@ -12,7 +12,7 @@ RunnerAbstraction, ) from ..abstractions.image import Image -from ..abstractions.volume import Volume +from ..abstractions.volume import CloudBucket, Volume from ..channel import with_grpc_error_handling from ..clients.gateway import ( DeployStubRequest, @@ -93,8 +93,8 @@ class Pod(RunnerAbstraction): specified but this value is set to 0, it will be automatically updated to 1. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). - volumes (Optional[List[Volume]]): - A list of volumes to be mounted to the pod. Default is None. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of volumes and/or cloud buckets to be mounted to the pod. Default is None. secrets (Optional[List[str]): A list of secrets that are injected into the pod as environment variables. Default is []. env (Optional[Dict[str, str]]): @@ -129,7 +129,7 @@ def __init__( gpu: Union[GpuTypeAlias, List[GpuTypeAlias]] = GpuType.NoGPU, gpu_count: int = 0, image: Image = Image(), - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, env: Optional[Dict[str, str]] = {}, keep_warm_seconds: int = 600, diff --git a/sdk/src/beta9/abstractions/taskqueue.py b/sdk/src/beta9/abstractions/taskqueue.py index 908c3e7b1..f06e212b1 100644 --- a/sdk/src/beta9/abstractions/taskqueue.py +++ b/sdk/src/beta9/abstractions/taskqueue.py @@ -12,7 +12,7 @@ RunnerAbstraction, ) from ..abstractions.image import Image -from ..abstractions.volume import Volume +from ..abstractions.volume import CloudBucket, Volume from ..channel import with_grpc_error_handling from ..clients.taskqueue import ( StartTaskQueueServeRequest, @@ -73,8 +73,8 @@ class TaskQueue(RunnerAbstraction): loading models, or anything else computationally expensive. callback_url (Optional[str]): An optional URL to send a callback to when a task is completed, timed out, or cancelled. - volumes (Optional[List[Volume]]): - A list of storage volumes to be associated with the taskqueue. Default is []. + volumes (Optional[List[Union[Volume, CloudBucket]]]): + A list of storage volumes and/or cloud buckets to be associated with the taskqueue. Default is []. secrets (Optional[List[str]): A list of secrets that are injected into the container as environment variables. Default is []. env (Optional[Dict[str, str]]): @@ -127,7 +127,7 @@ def __init__( on_start: Optional[Callable] = None, on_deploy: Optional[AbstractCallableWrapper] = None, callback_url: Optional[str] = None, - volumes: Optional[List[Volume]] = None, + volumes: Optional[List[Union[Volume, CloudBucket]]] = None, secrets: Optional[List[str]] = None, env: Optional[Dict[str, str]] = {}, name: Optional[str] = None, diff --git a/sdk/src/beta9/abstractions/volume.py b/sdk/src/beta9/abstractions/volume.py index 9a59d9625..32a5492b8 100644 --- a/sdk/src/beta9/abstractions/volume.py +++ b/sdk/src/beta9/abstractions/volume.py @@ -85,9 +85,9 @@ class CloudBucketConfig: read_only (bool): Whether the volume is read-only. access_key (str): - The S3 access key for the external provider. + The name of the beam secret containing the S3 access key for the external provider. secret_key (str): - The S3 secret key for the external provider. + The name of the beam secret containing the S3 secret key for the external provider. endpoint (Optional[str]): The S3 endpoint for the external provider. region (Optional[str]): @@ -125,8 +125,8 @@ def __init__(self, name: str, mount_path: str, config: CloudBucketConfig) -> Non name="other_model_weights", mount_path="./other-weights", config=CloudBucketConfig( - access_key="my-access-key", - secret_key="my-secret-key", + access_key="MY_ACCESS_KEY_SECRET", + secret_key="MY_SECRET_KEY_SECRET", endpoint="https://s3-endpoint.com", ), )