Skip to content

Commit

Permalink
Support storing code diffs on s3 (#764)
Browse files Browse the repository at this point in the history
* Support storing code diffs on s3

* Specify aws bucket region via env
  • Loading branch information
r4victor authored Nov 7, 2023
1 parent 33c8a6d commit e8e3fb9
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/dstack/_internal/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from dstack._internal.server.services.config import ServerConfigManager
from dstack._internal.server.services.projects import get_or_create_default_project
from dstack._internal.server.services.storage import init_default_storage
from dstack._internal.server.services.users import get_or_create_admin_user
from dstack._internal.server.settings import DEFAULT_PROJECT_NAME, SERVER_URL
from dstack._internal.server.utils.logging import configure_logging
Expand Down Expand Up @@ -100,6 +101,8 @@ async def lifespan(app: FastAPI):
create_default_project_config(
project_name=DEFAULT_PROJECT_NAME, url=SERVER_URL, token=admin.token
)
if settings.SERVER_BUCKET is not None:
init_default_storage()
scheduler = start_background_tasks()
dstack_version = dstack.version.__version__ if dstack.version.__version__ else "(no version)"
print(f"\nThe dstack server {dstack_version} is running at {SERVER_URL}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dstack._internal.core.models.repos import RemoteRepoCreds
from dstack._internal.core.models.runs import Job, JobErrorCode, JobStatus, Run
from dstack._internal.server.db import get_session_ctx
from dstack._internal.server.models import JobModel, RepoModel, RunModel
from dstack._internal.server.models import JobModel, ProjectModel, RepoModel, RunModel
from dstack._internal.server.services import logs as logs_services
from dstack._internal.server.services.jobs import (
RUNNING_PROCESSING_JOBS_IDS,
Expand All @@ -26,6 +26,7 @@
create_job_model_for_new_submission,
run_model_to_run,
)
from dstack._internal.server.services.storage import get_default_storage
from dstack._internal.server.utils.common import run_async
from dstack._internal.utils import common as common_utils
from dstack._internal.utils.interpolator import VariablesInterpolator
Expand Down Expand Up @@ -99,6 +100,7 @@ async def _process_job(job_id: UUID):
logger.debug("Polling provisioning job without shim: %s", job_model.job_name)
code = await _get_job_code(
session=session,
project=project,
repo=repo_model,
code_hash=run.run_spec.repo_code_hash,
)
Expand Down Expand Up @@ -128,6 +130,7 @@ async def _process_job(job_id: UUID):
logger.debug("Polling pulling job with shim: %s", job_model.job_name)
code = await _get_job_code(
session=session,
project=project,
repo=repo_model,
code_hash=run.run_spec.repo_code_hash,
)
Expand Down Expand Up @@ -312,11 +315,22 @@ def _process_running(
return True


async def _get_job_code(session: AsyncSession, repo: RepoModel, code_hash: str) -> bytes:
async def _get_job_code(
session: AsyncSession, project: ProjectModel, repo: RepoModel, code_hash: str
) -> bytes:
code_model = await get_code_model(session=session, repo=repo, code_hash=code_hash)
if code_model is not None:
if code_model is None:
return b""
storage = get_default_storage()
if storage is None or code_model.blob is not None:
return code_model.blob
return b""
blob = await run_async(
storage.get_code,
project.name,
repo.name,
code_hash,
)
return blob


def _submit_job_to_runner(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Make blob nullable
Revision ID: 23e01c56279a
Revises: 112753bc17dd
Create Date: 2023-11-06 16:13:00.455543
"""
import sqlalchemy as sa
import sqlalchemy_utils
from alembic import op

# revision identifiers, used by Alembic.
revision = "23e01c56279a"
down_revision = "112753bc17dd"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("codes", schema=None) as batch_op:
batch_op.alter_column("blob", existing_type=sa.BLOB(), nullable=True)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("codes", schema=None) as batch_op:
batch_op.alter_column("blob", existing_type=sa.BLOB(), nullable=False)

# ### end Alembic commands ###
2 changes: 1 addition & 1 deletion src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class CodeModel(BaseModel):
repo_id: Mapped[UUIDType] = mapped_column(ForeignKey("repos.id", ondelete="CASCADE"))
repo: Mapped["RepoModel"] = relationship()
blob_hash: Mapped[str] = mapped_column(String(4000), unique=True)
blob: Mapped[bytes] = mapped_column(BLOB)
blob: Mapped[Optional[bytes]] = mapped_column(BLOB) # None means blob is stored on s3


class RunModel(BaseModel):
Expand Down
22 changes: 17 additions & 5 deletions src/dstack/_internal/server/services/repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
RepoHeadWithCreds,
)
from dstack._internal.core.models.repos.remote import RemoteRepoCreds
from dstack._internal.server import settings
from dstack._internal.server.models import CodeModel, ProjectModel, RepoModel
from dstack._internal.server.services.storage import get_default_storage
from dstack._internal.server.utils.common import run_async


async def list_repos(
Expand Down Expand Up @@ -145,11 +148,20 @@ async def upload_code(
if code is not None:
return
blob = await file.read()
code = CodeModel(
repo_id=repo.id,
blob_hash=code_hash,
blob=blob,
)
storage = get_default_storage()
if storage is None:
code = CodeModel(
repo_id=repo.id,
blob_hash=code_hash,
blob=blob,
)
else:
code = CodeModel(
repo_id=repo.id,
blob_hash=code_hash,
blob=None,
)
await run_async(storage.upload_code, project.name, repo.name, code.blob_hash, blob)
session.add(code)
await session.commit()

Expand Down
66 changes: 66 additions & 0 deletions src/dstack/_internal/server/services/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import Optional

import botocore.exceptions
from boto3 import Session

from dstack._internal.server import settings


class S3Storage:
def __init__(
self,
bucket: str,
region: str,
):
self._session = Session()
self._client = self._session.client("s3", region_name=region)
self.bucket = bucket

def upload_code(
self,
project_id: str,
repo_id: str,
code_hash: str,
blob: bytes,
):
self._client.put_object(
Bucket=self.bucket,
Key=_get_code_key(project_id, repo_id, code_hash),
Body=blob,
)

def get_code(
self,
project_id: str,
repo_id: str,
code_hash: str,
) -> Optional[bytes]:
try:
response = self._client.get_object(
Bucket=self.bucket,
Key=_get_code_key(project_id, repo_id, code_hash),
)
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
return None
raise e
return response["Body"].read()


def _get_code_key(project_id: str, repo_id: str, code_hash: str) -> str:
return f"data/projects/{project_id}/codes/{repo_id}/{code_hash}"


_default_storage = None


def init_default_storage():
global _default_storage
_default_storage = S3Storage(
bucket=settings.SERVER_BUCKET,
region=settings.SERVER_BUCKET_REGION,
)


def get_default_storage() -> Optional[S3Storage]:
return _default_storage
3 changes: 3 additions & 0 deletions src/dstack/_internal/server/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
SERVER_CONFIG_ENABLED = not SERVER_CONFIG_DISABLED
LOCAL_BACKEND_ENABLED = os.getenv("DSTACK_LOCAL_BACKEND_ENABLED") is not None

SERVER_BUCKET = os.getenv("DSTACK_SERVER_BUCKET")
SERVER_BUCKET_REGION = os.getenv("DSTACK_SERVER_BUCKET_REGION", "eu-west-1")

DEFAULT_PROJECT_NAME = "main"
DEFAULT_GATEWAY_NAME = "default-gateway"

Expand Down

0 comments on commit e8e3fb9

Please sign in to comment.