Skip to content

Commit

Permalink
Batch inference round 1/n (#309)
Browse files Browse the repository at this point in the history
* Moved evaluator to jobs dir

* Fixed breaking evaluator integration test

* Providing minor for backend python version

* Added first draft of inference job

* Addressing review comments

* Removed timing function

* Minor fixes

* Addressed review comments

---------

Signed-off-by: Davide Eynard <[email protected]>
  • Loading branch information
aittalam authored Oct 25, 2024
1 parent af0f2df commit db3197b
Show file tree
Hide file tree
Showing 89 changed files with 327 additions and 40 deletions.
6 changes: 4 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ services:
- AWS_ENDPOINT_URL=http://localhost:4566
- S3_BUCKET=lumigator-storage
- PYTHONPATH=/mzai/lumigator/python/mzai/backend
- PIP_REQS=/mzai/lumigator/python/mzai/evaluator/requirements.txt
- EVALUATOR_WORK_DIR=/mzai/lumigator/python/mzai/evaluator
- EVALUATOR_PIP_REQS=/mzai/lumigator/python/mzai/jobs/evaluator/requirements.txt
- EVALUATOR_WORK_DIR=/mzai/lumigator/python/mzai/jobs/evaluator
- INFERENCE_PIP_REQS=/mzai/lumigator/python/mzai/jobs/inference/requirements.txt
- INFERENCE_WORK_DIR=/mzai/lumigator/python/mzai/jobs/inference
- RAY_DASHBOARD_PORT=8265
- RAY_HEAD_NODE_HOST=ray
- MISTRAL_API_KEY=${MISTRAL_API_KEY}
Expand Down
2 changes: 1 addition & 1 deletion lumigator/python/mzai/backend/.python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.11.9
5 changes: 3 additions & 2 deletions lumigator/python/mzai/backend/backend/api/routes/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
router = APIRouter()


@router.post("/inference", status_code=status.HTTP_201_CREATED)
@router.post("/inference/", status_code=status.HTTP_201_CREATED)
def create_inference_job(
service: JobServiceDep,
request: JobCreate,
) -> JobResponse:
return service.create_inference_job(request)

@router.post("/evaluate", status_code=status.HTTP_201_CREATED)

@router.post("/evaluate/", status_code=status.HTTP_201_CREATED)
def create_evaluation_job(
service: JobServiceDep,
request: JobCreate,
Expand Down
31 changes: 8 additions & 23 deletions lumigator/python/mzai/backend/backend/services/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ def _get_results_s3_key(self, job_id: UUID) -> str:

return str(
Path(settings.S3_JOB_RESULTS_PREFIX)
/ settings.S3_JOB_RESULTS_FILENAME.format(
job_name=record.name, job_id=record.id
)
/ settings.S3_JOB_RESULTS_FILENAME.format(job_name=record.name, job_id=record.id)
)

def create_inference_job(self, request: JobCreate) -> JobResponse:
Expand All @@ -80,7 +78,7 @@ def create_inference_job(self, request: JobCreate) -> JobResponse:
dataset_s3_path = self.data_service.get_dataset_s3_path(request.dataset)

# set storage path
storage_path = f"s3://{Path(settings.S3_BUCKET) / settings.S3_JOB_RESULTS_PREFIX}/"
storage_path = f"s3://{ Path(settings.S3_BUCKET) / settings.S3_JOB_RESULTS_PREFIX }/"

# fill up model url with default openai url
if request.model.startswith("oai://"):
Expand All @@ -105,7 +103,6 @@ def create_inference_job(self, request: JobCreate) -> JobResponse:
"system_prompt": request.system_prompt,
}


# load a config template and fill it up with config_params
if request.config_infer_template is not None:
config_template = request.config_infer_template
Expand All @@ -121,16 +118,13 @@ def create_inference_job(self, request: JobCreate) -> JobResponse:
"--config": config_template.format(**config_params),
}

#TODO Add inference module as entrypoint
infer_command = f"{settings.LD_PRELOAD_PREFIX} python -m inference infer huggingface"

# Prepare the job configuration that will be sent to submit the ray job.
# This includes both the command that is going to be executed and its
# arguments defined in infer_config_args
ray_config = JobConfig(
job_id=record.id,
job_type=JobType.INFERENCE,
command=infer_command,
command=settings.INFERENCE_COMMAND,
args=infer_config_args,
)

Expand All @@ -145,8 +139,8 @@ def create_inference_job(self, request: JobCreate) -> JobResponse:
worker_gpus = settings.RAY_WORKER_GPUS

runtime_env = {
"pip": settings.PIP_REQS,
"working_dir": settings.EVALUATOR_WORK_DIR,
"pip": settings.INFERENCE_PIP_REQS,
"working_dir": settings.INFERENCE_WORK_DIR,
"env_vars": runtime_env_vars,
}

Expand Down Expand Up @@ -215,20 +209,13 @@ def create_evaluation_job(self, request: JobCreate) -> JobResponse:
"--config": config_template.format(**config_params),
}

# Pre-loading libgomp with LD_PRELOAD resolves allocation issues on aarch64
# (see https://github.com/mozilla-ai/lumigator/issues/156). The path where
# libs are stored on worker nodes contains a hash that depends on the
# installed libraries, so we get it dynamically right before running the
# command (more info in settings.py)
eval_command = f"{settings.LD_PRELOAD_PREFIX} python -m evaluator evaluate huggingface"

# Prepare the job configuration that will be sent to submit the ray job.
# This includes both the command that is going to be executed and its
# arguments defined in eval_config_args
ray_config = JobConfig(
job_id=record.id,
job_type=JobType.EVALUATION,
command=eval_command,
command=settings.EVALUATOR_COMMAND,
args=eval_config_args,
)

Expand All @@ -243,7 +230,7 @@ def create_evaluation_job(self, request: JobCreate) -> JobResponse:
worker_gpus = settings.RAY_WORKER_GPUS

runtime_env = {
"pip": settings.PIP_REQS,
"pip": settings.EVALUATOR_PIP_REQS,
"working_dir": settings.EVALUATOR_WORK_DIR,
"env_vars": runtime_env_vars,
}
Expand Down Expand Up @@ -298,9 +285,7 @@ def get_job_result(self, job_id: UUID) -> JobResultResponse:
)
return JobResultResponse.model_validate(result_record)

def get_job_result_download(
self, job_id: UUID
) -> JobResultDownloadResponse:
def get_job_result_download(self, job_id: UUID) -> JobResultDownloadResponse:
"""Return job results file URL for downloading."""
# Generate presigned download URL for the object
result_key = self._get_results_s3_key(job_id)
Expand Down
20 changes: 17 additions & 3 deletions lumigator/python/mzai/backend/backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,24 @@ class BackendSettings(BaseSettings):
MISTRAL_API_URL: str = "https://api.mistral.ai/v1"
DEFAULT_SUMMARIZER_PROMPT: str = "You are a helpful assistant, expert in text summarization. For every prompt you receive, provide a summary of its contents in at most two sentences." # noqa: E501

# Eval
# Eval job details
EVALUATOR_WORK_DIR: str | None = None
EVALUATOR_PIP_REQS: str | None = None

@computed_field
@property
def EVALUATOR_COMMAND(self) -> str: # noqa: N802
"""Returns the command required to run evaluator.
The prefix is provided to fix an issue loading libgomp (an sklearn dependency)
on the aarch64 ray image (see LD_PRELOAD_PREFIX definition below for more details)
"""
return f"{self.LD_PRELOAD_PREFIX} python -m evaluator evaluate huggingface"

# Inference job details
INFERENCE_WORK_DIR: str | None = None
INFERENCE_PIP_REQS: str | None = None
INFERENCE_COMMAND: str = "python inference.py"

def inherit_ray_env(self, runtime_env_vars: Mapping[str, str]):
for env_var_name in self.RAY_WORKER_ENV_VARS:
Expand Down Expand Up @@ -83,8 +99,6 @@ def RAY_WORKER_GPUS(self) -> float: # noqa: N802
def RAY_WORKER_GPUS_FRACTION(self) -> float: # noqa: N802
return float(os.environ.get(self.RAY_WORKER_GPUS_FRACTION_ENV_VAR, 1.0))

PIP_REQS: str | None = None

@computed_field
@property
def RAY_DASHBOARD_URL(self) -> str: # noqa: N802
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@

@app.on_event("startup")
def test_health_ok(local_client: TestClient):
response = local_client.get("/health/")
assert response.status_code == 200
response = local_client.get("/health/")
assert response.status_code == 200


def test_upload_data_launch_job(local_client: TestClient, dialog_dataset):
response = local_client.get("/health")
assert response.status_code == 200

create_response = local_client.post("/datasets",
data={},
files={"dataset": dialog_dataset, "format": (None, DatasetFormat.JOB.value)},
)
create_response = local_client.post(
"/datasets/",
data={},
files={"dataset": dialog_dataset, "format": (None, DatasetFormat.JOB.value)},
)

assert create_response.status_code == 201

Expand All @@ -40,9 +42,15 @@ def test_upload_data_launch_job(local_client: TestClient, dialog_dataset):
"config_template": "string",
}

create_experiment_response = local_client.post("/jobs/evaluate", headers=headers, json=payload
create_evaluation_job_response = local_client.post(
"/jobs/evaluate/", headers=headers, json=payload
)
assert create_evaluation_job_response.status_code == 201

create_inference_job_response = local_client.post(
"/jobs/inference/", headers=headers, json=payload
)
assert create_experiment_response.status_code == 201
assert create_inference_job_response.status_code == 201


def test_experiment_non_existing(local_client: TestClient):
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@pytest.fixture(scope="session")
def examples_dir():
return Path(__file__).parents[1] / "examples"
return Path(__file__).parents[2] / "examples"


@pytest.fixture(scope="session")
Expand Down
1 change: 1 addition & 0 deletions lumigator/python/mzai/jobs/inference/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Inference Documentation
127 changes: 127 additions & 0 deletions lumigator/python/mzai/jobs/inference/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""python job to run batch inference"""

import argparse
import json
from collections.abc import Iterable
from pathlib import Path

import s3fs
from box import Box
from datasets import load_from_disk
from loguru import logger
from model_clients import (
BaseModelClient,
MistralModelClient,
OpenAIModelClient,
)
from tqdm import tqdm


def predict(dataset_iterable: Iterable, model_client: BaseModelClient) -> list:
predictions = []

for sample_txt in dataset_iterable:
predictions.append(model_client.predict(sample_txt))

return predictions


def save_to_disk(local_path: Path, data_dict: dict):
logger.info(f"Storing into {local_path}...")
local_path.parent.mkdir(exist_ok=True, parents=True)
with local_path.open("w") as f:
json.dump(data_dict, f)


def save_to_s3(config: Box, local_path: Path, storage_path: str):
s3 = s3fs.S3FileSystem()
if storage_path.endswith("/"):
storage_path = "s3://" + str(
Path(storage_path[5:]) / config.name / "inference_results.json"
)
logger.info(f"Storing into {storage_path}...")
s3.put_file(local_path, storage_path)


def save_outputs(config: Box, inference_results: dict) -> Path:
storage_path = config.evaluation.storage_path

# generate local temp file ANYWAY:
# - if storage_path is not provided, it will be stored and kept into a default dir
# - if storage_path is provided AND saving to S3 is successful, local file is deleted
local_path = Path(
Path.home() / ".lumigator" / "results" / config.name / "inference_results.json"
)

try:
save_to_disk(local_path, inference_results)

# copy to s3 and return path
if storage_path is not None and storage_path.startswith("s3://"):
save_to_s3(config, local_path, storage_path)
Path.unlink(local_path)
Path.rmdir(local_path.parent)
return storage_path
else:
return local_path

except Exception as e:
logger.error(e)


def run_inference(config: Box) -> Path:
# initialize output dictionary
output = {}

# Load dataset given its URI
dataset = load_from_disk(config.dataset.path)

# Limit dataset length if max_samples is specified
max_samples = config.evaluation.max_samples
if max_samples is not None and max_samples > 0:
if max_samples > len(dataset):
logger.info(f"max_samples ({max_samples}) resized to dataset size ({len(dataset)})")
max_samples = len(dataset)
dataset = dataset.select(range(max_samples))

# Enable / disable tqdm
input_samples = dataset["examples"]
dataset_iterable = tqdm(input_samples) if config.evaluation.enable_tqdm else input_samples

# Choose which model client to use
if config.model.inference is not None:
# a model *inference service* is passed
base_url = config.model.inference.base_url
output_model_name = config.model.inference.engine
if "mistral" in base_url:
# run the mistral client
logger.info(f"Using Mistral client. Endpoint: {base_url}")
model_client = MistralModelClient(base_url, config.model)
else:
# run the openai client
logger.info(f"Using OAI client. Endpoint: {base_url}")
model_client = OpenAIModelClient(base_url, config.model)

# run inference
output["predictions"] = predict(dataset_iterable, model_client)
output["examples"] = dataset["examples"]
output["ground_truth"] = dataset["ground_truth"]
output["model"] = output_model_name

output_path = save_outputs(config, output)
return output_path


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, help="Configuration in JSON format")
args = parser.parse_args()

if not args.config:
parser.print_help() # Print the usage message and exit
err_str = "No input configuration provided. Please pass one using the --config flag"
logger.error(err_str)
else:
config = json.loads(args.config)
result_dataset_path = run_inference(Box(config, default_box=True, default_box_attr=None))
logger.info(f"Inference results stored at {result_dataset_path}")
Loading

0 comments on commit db3197b

Please sign in to comment.