Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GPU blocks property #2253

Merged
merged 8 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/dstack/_internal/cli/utils/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def get_fleets_table(
resources = instance.instance_type.resources.pretty_format(include_spot=True)

status = instance.status.value
total_blocks = instance.total_blocks
busy_blocks = instance.busy_blocks
if total_blocks is not None and busy_blocks is not None:
if busy_blocks < total_blocks and instance.status == InstanceStatus.BUSY:
idle_blocks = total_blocks - busy_blocks
status = f"{idle_blocks}/{total_blocks} {InstanceStatus.IDLE.value}"
if (
instance.status in [InstanceStatus.IDLE, InstanceStatus.BUSY]
and instance.unreachable
Expand Down
20 changes: 15 additions & 5 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from rich.table import Table

from dstack._internal.cli.utils.common import NO_OFFERS_WARNING, add_row_from_dict, console
from dstack._internal.core.models.instances import InstanceAvailability
from dstack._internal.core.models.instances import InstanceAvailability, InstanceSharedOffer
from dstack._internal.core.models.profiles import (
DEFAULT_RUN_TERMINATION_IDLE_TIME,
TerminationPolicy,
Expand Down Expand Up @@ -96,11 +96,14 @@ def th(s: str) -> str:
InstanceAvailability.BUSY,
}:
availability = offer.availability.value.replace("_", " ").lower()
instance = offer.instance.name
if isinstance(offer, InstanceSharedOffer):
instance += f" ({offer.blocks}/{offer.total_blocks})"
offers.add_row(
f"{i}",
offer.backend.replace("remote", "ssh"),
offer.region,
offer.instance.name,
instance,
r.pretty_format(),
"yes" if r.spot else "no",
f"${offer.price:g}",
Expand Down Expand Up @@ -161,13 +164,20 @@ def get_runs_table(
"SUBMITTED": format_date(job.job_submissions[-1].submitted_at),
"ERROR": _get_job_error(job),
}
jpd = job.job_submissions[-1].job_provisioning_data
latest_job_submission = job.job_submissions[-1]
jpd = latest_job_submission.job_provisioning_data
if jpd is not None:
resources = jpd.instance_type.resources
instance = jpd.instance_type.name
jrd = latest_job_submission.job_runtime_data
if jrd is not None and jrd.offer is not None:
resources = jrd.offer.instance.resources
instance += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})"
job_row.update(
{
"BACKEND": f"{jpd.backend.value.replace('remote', 'ssh')} ({jpd.region})",
"INSTANCE": jpd.instance_type.name,
"RESOURCES": jpd.instance_type.resources.pretty_format(include_spot=True),
"INSTANCE": instance,
"RESOURCES": resources.pretty_format(include_spot=True),
"RESERVATION": jpd.reservation,
"PRICE": f"${jpd.price:.4}",
}
Expand Down
22 changes: 22 additions & 0 deletions src/dstack/_internal/core/models/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ class SSHHostParams(CoreModel):
] = None
ssh_key: Optional[SSHKey] = None

blocks: Annotated[
Union[Literal["auto"], int],
Field(
description=(
"The amount of blocks to split the instance into, a number or `auto` (e.g., `4`)."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe elaborate what auto means?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

" Defaults to `1` (do not split)"
),
ge=1,
),
] = 1

@validator("internal_ip")
def validate_internal_ip(cls, value):
if value is None:
Expand Down Expand Up @@ -142,6 +153,17 @@ class InstanceGroupParams(CoreModel):
Field(description="The resources requirements"),
] = ResourcesSpec()

blocks: Annotated[
Union[Literal["auto"], int],
Field(
description=(
"The amount of blocks to split the instance into, a number or `auto` (e.g., `4`)."
" Defaults to `1` (do not split)"
),
ge=1,
),
] = 1

backends: Annotated[
Optional[List[BackendType]],
Field(description="The backends to consider for provisioning (e.g., `[aws, gcp]`)"),
Expand Down
12 changes: 11 additions & 1 deletion src/dstack/_internal/core/models/instances.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import List, Optional
from typing import List, Literal, Optional, Union

import gpuhunt
from pydantic import root_validator
Expand Down Expand Up @@ -110,6 +110,11 @@ def get_public_keys(self) -> List[str]:
return [ssh_key.public.strip() for ssh_key in self.ssh_keys]


class InstanceSharedInfo(CoreModel):
total_blocks: Union[Literal["auto"], int]
busy_blocks: int = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about storing busy_blocks in json vs in a separate column. Any particular reasons to do so? I'm generally inclined to keep static data in json and changing fields like this as separate columns.

Maybe using json is more flexible if we take a complete different approach instead of relying on blocks, Idk.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with two integer columns 8b555f7



class InstanceRuntime(Enum):
SHIM = "shim"
RUNNER = "runner"
Expand Down Expand Up @@ -143,6 +148,11 @@ class InstanceOfferWithAvailability(InstanceOffer):
instance_runtime: InstanceRuntime = InstanceRuntime.SHIM


class InstanceSharedOffer(InstanceOfferWithAvailability):
blocks: int
total_blocks: int


class InstanceStatus(str, Enum):
PENDING = "pending"
PROVISIONING = "provisioning"
Expand Down
4 changes: 3 additions & 1 deletion src/dstack/_internal/core/models/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ class Instance(CoreModel):
fleet_name: Optional[str] = None
instance_num: int
pool_name: Optional[str] = None
job_name: Optional[str] = None
job_name: Optional[str] = None # deprecated, always None (instance can have more than one job)
hostname: Optional[str] = None
status: InstanceStatus
unreachable: bool = False
termination_reason: Optional[str] = None
created: datetime.datetime
region: Optional[str] = None
price: Optional[float] = None
total_blocks: Optional[int] = None
busy_blocks: Optional[int] = None


class PoolInstances(CoreModel):
Expand Down
9 changes: 7 additions & 2 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional, Type
from typing import Any, Dict, List, Optional, Type, Union

from pydantic import UUID4, Field, root_validator
from typing_extensions import Annotated
Expand All @@ -13,6 +13,7 @@
)
from dstack._internal.core.models.instances import (
InstanceOfferWithAvailability,
InstanceSharedOffer,
InstanceType,
SSHConnectionParams,
)
Expand Down Expand Up @@ -239,6 +240,10 @@ class JobRuntimeData(CoreModel):
# None if data is not yet available (on vm-based backends and ssh instances)
# or not applicable (container-based backends)
ports: Optional[dict[int, int]] = None
# List of volumes used by the job
volume_names: Optional[list[str]] = None # None for backward compalibility
# Virtual shared offer. None if the instance is not shared.
offer: Optional[InstanceSharedOffer] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's more useful to always store the offer used by the job, not just when it was shared?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 8b555f7

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's crucial now to add description of JobRuntimeData and what should go there so that it does not become a catch-all struct for everything. Probably makes sense to add a similar description to JobProvisioningData to differentiate it with JobRuntimeData.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



class ClusterInfo(CoreModel):
Expand Down Expand Up @@ -416,7 +421,7 @@ def _error(cls, values) -> Dict:

class JobPlan(CoreModel):
job_spec: JobSpec
offers: List[InstanceOfferWithAvailability]
offers: List[Union[InstanceSharedOffer, InstanceOfferWithAvailability]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to introduce different types of offers to the API? Can't every InstanceOfferWithAvailability be InstanceSharedOffer with total_blocks/blocks = 1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total_offers: int
max_price: Optional[float]

Expand Down
60 changes: 51 additions & 9 deletions src/dstack/_internal/server/background/tasks/process_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
get_instance_profile,
get_instance_provisioning_data,
get_instance_requirements,
get_instance_shared_info,
)
from dstack._internal.server.services.runner import client as runner_client
from dstack._internal.server.services.runner.client import HealthStatus
Expand Down Expand Up @@ -133,7 +134,7 @@ async def _process_next_instance():
),
InstanceModel.id.not_in(lockset),
)
.options(lazyload(InstanceModel.job))
.options(lazyload(InstanceModel.jobs))
.order_by(InstanceModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
Expand All @@ -156,15 +157,15 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel):
select(InstanceModel)
.where(InstanceModel.id == instance.id)
.options(joinedload(InstanceModel.project).joinedload(ProjectModel.backends))
.options(joinedload(InstanceModel.job))
.options(joinedload(InstanceModel.jobs))
.options(joinedload(InstanceModel.fleet).joinedload(FleetModel.instances))
.execution_options(populate_existing=True)
)
instance = res.unique().scalar_one()
if (
instance.status == InstanceStatus.IDLE
and instance.termination_policy == TerminationPolicy.DESTROY_AFTER_IDLE
and instance.job_id is None
and not instance.jobs
):
await _mark_terminating_if_idle_duration_expired(instance)
if instance.status == InstanceStatus.PENDING:
Expand Down Expand Up @@ -322,6 +323,30 @@ async def _add_remote(instance: InstanceModel) -> None:
)
return

shared_info = get_instance_shared_info(instance)
if shared_info is not None:
resources = instance_type.resources
blocks = shared_info.total_blocks
if blocks == "auto":
blocks = len(resources.gpus)
if blocks > 1:
if len(resources.gpus) % blocks or resources.cpus % blocks:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Cannot split into blocks"
logger.warning(
"Failed to add instance %s: cannot split into blocks",
instance.name,
extra={
"instance_name": instance.name,
"instance_status": InstanceStatus.TERMINATED.value,
},
)
return
shared_info.total_blocks = blocks
instance.shared_info = shared_info.json()
else:
instance.shared_info = None

region = instance.region
jpd = JobProvisioningData(
backend=BackendType.REMOTE,
Expand Down Expand Up @@ -439,10 +464,11 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
instance_configuration = get_instance_configuration(instance)
profile = get_instance_profile(instance)
requirements = get_instance_requirements(instance)
shared_info = get_instance_shared_info(instance)
except ValidationError as e:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = (
f"Error to parse profile, requirements or instance_configuration: {e}"
f"Error to parse profile, requirements, shared_info or instance_configuration: {e}"
)
instance.last_retry_at = get_current_datetime()
logger.warning(
Expand Down Expand Up @@ -473,12 +499,18 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
)
return

if shared_info is None:
blocks = None
else:
blocks = shared_info.total_blocks

offers = await get_create_instance_offers(
project=instance.project,
profile=profile,
requirements=requirements,
exclude_not_available=True,
fleet_model=instance.fleet,
blocks=blocks,
)

if not offers and should_retry:
Expand Down Expand Up @@ -557,6 +589,18 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
instance.started_at = get_current_datetime()
instance.last_retry_at = get_current_datetime()

if shared_info is not None:
if blocks == "auto":
blocks = len(instance_offer.instance.resources.gpus)
if blocks > 1:
shared_info.total_blocks = blocks
else:
shared_info = None
if shared_info is not None:
instance.shared_info = shared_info.json()
else:
instance.shared_info = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared_info updating is convoluted. I believe it can be created with total_blocks: "auto" and then it's overridden. I don't see much value in that. I'd keep user's configured blocks as a static property somewhere and wound't touch it. total_blocks can then be resolved when instance resources are known. No need to allow for auto.

Copy link
Collaborator Author

@un-def un-def Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with NULL -> calculated auto blocks in 8b555f7

NULL as "info is not yet available" seems to fit there


logger.info(
"Created instance %s",
instance.name,
Expand Down Expand Up @@ -585,8 +629,8 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
async def _check_instance(instance: InstanceModel) -> None:
if (
instance.status == InstanceStatus.BUSY
and instance.job is not None
and instance.job.status.is_finished()
and instance.jobs
and all(job.status.is_finished() for job in instance.jobs)
):
# A busy instance could have no active jobs due to this bug: https://github.com/dstackai/dstack/issues/2068
instance.status = InstanceStatus.TERMINATING
Expand Down Expand Up @@ -648,9 +692,7 @@ async def _check_instance(instance: InstanceModel) -> None:
instance.unreachable = False

if instance.status == InstanceStatus.PROVISIONING:
instance.status = (
InstanceStatus.IDLE if instance.job_id is None else InstanceStatus.BUSY
)
instance.status = InstanceStatus.IDLE if not instance.jobs else InstanceStatus.BUSY
logger.info(
"Instance %s has switched to %s status",
instance.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def _process_next_running_job():
.limit(1)
.with_for_update(skip_locked=True)
)
job_model = res.scalar()
job_model = res.unique().scalar()
if job_model is None:
return
lockset.add(job_model.id)
Expand All @@ -102,7 +102,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
.options(joinedload(JobModel.instance))
.execution_options(populate_existing=True)
)
job_model = res.scalar_one()
job_model = res.unique().scalar_one()
res = await session.execute(
select(RunModel)
.where(RunModel.id == job_model.run_id)
Expand Down
Loading