From 920314291da19b2ca1c44e18c41cf9de08cede53 Mon Sep 17 00:00:00 2001 From: Andreea Popescu Date: Wed, 22 Jan 2025 08:35:17 +0000 Subject: [PATCH] fix streaming synthetic jobs --- .../compute_horde/miner_client/base.py | 4 +- .../src/compute_horde_validator/settings.py | 10 + .../validator/synthetic_jobs/batch_run.py | 235 +++++++++++------- .../synthetic_jobs/generator/base.py | 25 +- .../synthetic_jobs/generator/gpu_hashcat.py | 17 +- .../synthetic_jobs/generator/llm_prompts.py | 112 ++++++++- .../tests/test_synthetic_jobs/helpers.py | 7 +- .../test_synthetic_jobs/mock_generator.py | 15 +- .../tests/test_synthetic_jobs/test_batch.py | 5 +- .../test_multiple_miners.py | 93 +++++-- .../test_synthetic_jobs/test_single_miner.py | 3 +- .../validator/tests/test_utils.py | 28 +-- 12 files changed, 392 insertions(+), 162 deletions(-) diff --git a/compute_horde/compute_horde/miner_client/base.py b/compute_horde/compute_horde/miner_client/base.py index 16856b3a5..0380a112a 100644 --- a/compute_horde/compute_horde/miner_client/base.py +++ b/compute_horde/compute_horde/miner_client/base.py @@ -111,7 +111,9 @@ async def read_messages(self): try: msg = self.accepted_request_type().parse(msg) except ValidationError as ex: - error_msg = f"Malformed message from miner {self.miner_name}: {str(ex)}" + if msg == "PING": + continue + error_msg = f"Malformed message {msg} from miner {self.miner_name}: {str(ex)}" logger.info(error_msg) self.deferred_send_model(self.build_outgoing_generic_error(error_msg)) continue diff --git a/validator/app/src/compute_horde_validator/settings.py b/validator/app/src/compute_horde_validator/settings.py index 92b6e92d2..37dbe828b 100644 --- a/validator/app/src/compute_horde_validator/settings.py +++ b/validator/app/src/compute_horde_validator/settings.py @@ -609,6 +609,16 @@ def wrapped(*args, **kwargs): "level": "WARNING", "propagate": True, }, + "botocore": { + "handlers": ["console"], + "level": "WARNING", + "propagate": True, + }, + "httpcore.http11": { + "handlers": ["console"], + "level": "WARNING", + "propagate": True, + }, }, } diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py index 9d363ea19..8e36eec02 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py @@ -86,7 +86,6 @@ BaseSyntheticJobGenerator, ) from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import ( - LlmPromptsJobGenerator, LlmPromptsSyntheticJobGenerator, ) from compute_horde_validator.validator.synthetic_jobs.scoring import get_manifest_multiplier @@ -1320,7 +1319,6 @@ async def _handle_job_accepted(client: MinerClient, ctx: BatchContext, job: Job) async def _send_job_request( ctx: BatchContext, start_barrier: asyncio.Barrier, - streaming_start_barrier: asyncio.Barrier | None, job_uuid: str, ) -> None: await start_barrier.wait() @@ -1343,6 +1341,7 @@ async def _send_job_request( request_json = request.model_dump_json() timeout = job.job_generator.timeout_seconds() + _JOB_RESPONSE_EXTRA_TIMEOUT + async with asyncio.timeout(timeout): # send can block, so take a timestamp # on both sides to detect long send times @@ -1350,13 +1349,87 @@ async def _send_job_request( await client.send_check(request_json) job.job_after_sent_time = datetime.now(tz=UTC) - # if streaming job - if streaming_start_barrier is not None: - await _trigger_streaming_job(ctx, streaming_start_barrier, job_uuid) - await job.job_response_event.wait() +async def _send_job_request_for_streaming( + ctx: BatchContext, + job_uuid: str, +): + job = ctx.jobs[job_uuid] + client = ctx.clients[job.miner_hotkey] + + request = V0JobRequest( + job_uuid=job.uuid, + executor_class=job.executor_class, + docker_image_name=job.job_generator.docker_image_name(), + docker_run_options_preset=job.job_generator.docker_run_options_preset(), + docker_run_cmd=job.job_generator.docker_run_cmd(), + raw_script=job.job_generator.raw_script(), + volume=job.volume if not job.job_generator.volume_in_initial_req() else None, + output_upload=job.output_upload, + ) + request_json = request.model_dump_json() + + job.job_before_sent_time = datetime.now(tz=UTC) + await client.send_check(request_json) + job.job_after_sent_time = datetime.now(tz=UTC) + + try: + async with asyncio.timeout(await job.job_generator.streaming_preparation_timeout()): + await job.streaming_job_ready_response_event.wait() + except TimeoutError: + logger.debug("Timeout waiting for StreamingJobReadyRequest for job_uuid=%s", job_uuid) + return + + logger.debug(f"Received streaming job ready response for {job_uuid}") + + response = job.streaming_job_ready_response + if not isinstance(response, V0StreamingJobReadyRequest): + logger.warning(f"Bad job ready response for {job_uuid}: {response}") + + +async def _multi_send_job_request_for_streaming( + ctx: BatchContext, + executor_ready_jobs: list[tuple[str, bool]], +): + streaming_jobs = [job_uuid for job_uuid, is_streaming in executor_ready_jobs if is_streaming] + if not streaming_jobs: + logger.debug("No streaming jobs to run") + return + logger.debug("Sending job request for %s streaming jobs", len(streaming_jobs)) + + tasks = [ + asyncio.create_task( + _send_job_request_for_streaming(ctx, job_uuid), + name=f"{job_uuid}._send_job_request_for_streaming", + ) + for job_uuid in streaming_jobs + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + exceptions: list[ExceptionInfo] = [] + for i, result in enumerate(results): + if isinstance(result, BaseException): + job_uuid = streaming_jobs[i][0] + job = ctx.jobs[job_uuid] + job.exception = result + job.exception_time = datetime.now(tz=UTC) + job.exception_stage = "_send_job_request_for_streaming" + exceptions.append( + ExceptionInfo( + exception=job.exception, + miner_hotkey=job.miner_hotkey, + job_uuid=job.uuid, + stage=job.exception_stage, + ) + ) + else: + assert result is None + _handle_exceptions(ctx, exceptions) + + async def _send_job_finished_receipts(ctx: BatchContext) -> None: for job in ctx.jobs.values(): # generate job finished receipts for all jobs @@ -1583,63 +1656,6 @@ async def _multi_send_initial_job_request(ctx: BatchContext) -> None: _handle_exceptions(ctx, exceptions) -async def _trigger_streaming_job( - ctx: BatchContext, streaming_start_barrier: asyncio.Barrier, job_uuid: str -) -> None: - job = ctx.jobs[job_uuid] - response = None - try: - timeout = await aget_config("DYNAMIC_SYNTHETIC_STREAMING_JOB_READY_TIMEOUT") - async with asyncio.timeout(timeout): - await job.streaming_job_ready_response_event.wait() - logger.debug(f"Received streaming job ready response for {job_uuid}") - - response = job.streaming_job_ready_response - if not isinstance(response, V0StreamingJobReadyRequest): - return - - # Save job certificate received from executor - executor_cert_path = ctx.certs_basepath / "ssl" / f"executor_certificate_{job_uuid}.pem" - executor_cert_path.write_text(response.public_key) - - # provide the synthetic job prompt seed to the streaming job - if isinstance(job.job_generator, LlmPromptsJobGenerator): - seed = job.job_generator.seed - else: - logger.error(f"Bad streaming job generator type: {job.job_generator}") - return - - finally: - # !!! it's very important we wait on this barrier, no matter what happens above, - # if we don't wait, other concurrent jobs will hang forever since they will - # never pass this barrier - await streaming_start_barrier.wait() - - if not isinstance(response, V0StreamingJobReadyRequest): - return - - async with httpx.AsyncClient( - verify=str(executor_cert_path), - cert=ctx.own_certs, - timeout=job.job_generator.timeout_seconds(), - ) as client: - # send the seed to the executor to start the streaming job - url = f"https://{response.ip}:{response.port}/execute-job" - try: - r = await client.post(url, json={"seed": seed}, headers={"Host": response.ip}) - r.raise_for_status() - except Exception as e: - msg = f"Failed to execute streaming job {job_uuid} on {url}: {e}" - logger.warning(msg) - raise Exception(msg) - finally: - # schedule the job to terminate - url = f"https://{response.ip}:{response.port}/terminate" - r = await client.get(url, headers={"Host": response.ip}) - if r.status_code != 200: - logger.warning(f"Failed to terminate streaming job {job_uuid} on {url}") - - async def _get_executor_ready_jobs(ctx: BatchContext) -> list[tuple[str, bool]]: streaming_classes = await get_streaming_job_executor_classes() @@ -1656,42 +1672,72 @@ async def _get_executor_ready_jobs(ctx: BatchContext) -> list[tuple[str, bool]]: return executor_ready_jobs -async def _multi_send_job_request( +async def _trigger_job_execution( ctx: BatchContext, executor_ready_jobs: list[tuple[str, bool]] ) -> None: assert executor_ready_jobs - logger.info("Sending job requests for %d ready jobs", len(executor_ready_jobs)) + buffered_tasks = [] + streaming_tasks = [] + + start_barrier = asyncio.Barrier( + len( + [ + (job_uuid, is_streaming) + for job_uuid, is_streaming in executor_ready_jobs + if not is_streaming + or isinstance( + ctx.jobs[job_uuid].streaming_job_ready_response, V0StreamingJobReadyRequest + ) + ] + ) + ) - start_barrier = asyncio.Barrier(len(executor_ready_jobs)) + for job_uuid, is_streaming in executor_ready_jobs: + if not is_streaming: + buffered_tasks.append( + asyncio.create_task( + _send_job_request(ctx, start_barrier, job_uuid), + name=f"{job_uuid}._trigger_job_execution", + ) + ) + elif isinstance( + streaming_response := ctx.jobs[job_uuid].streaming_job_ready_response, + V0StreamingJobReadyRequest, + ): + streaming_tasks.append( + asyncio.create_task( + ctx.jobs[job_uuid].job_generator.trigger_streaming_job_execution( + job_uuid, + start_barrier, + streaming_response.public_key, + ctx.own_certs, + streaming_response.ip, + streaming_response.port, + ), + name=f"{job_uuid}._trigger_job_execution", + ) + ) + else: + logger.debug(f"Not triggering execution for streaming job {job_uuid}") - num_streaming_jobs = len( - [job_uuid for job_uuid, is_streaming in executor_ready_jobs if is_streaming] + logger.info( + "Sending job requests for %d ready jobs - %s streaming and %s buffered", + len(streaming_tasks) + len(buffered_tasks), + len(streaming_tasks), + len(buffered_tasks), ) - if num_streaming_jobs > 0: - streaming_start_barrier = asyncio.Barrier(num_streaming_jobs) - else: - streaming_start_barrier = None - tasks = [ - asyncio.create_task( - _send_job_request( - ctx, start_barrier, streaming_start_barrier if is_streaming else None, job_uuid - ), - name=f"{job_uuid}._send_job_request", - ) - for job_uuid, is_streaming in executor_ready_jobs - ] - - results = await asyncio.gather(*tasks, return_exceptions=True) + results = await asyncio.gather(*(streaming_tasks + buffered_tasks), return_exceptions=True) exceptions: list[ExceptionInfo] = [] for i, result in enumerate(results): if isinstance(result, BaseException): + logger.exception("When running Synthetic jobs", exc_info=result) job_uuid = executor_ready_jobs[i][0] job = ctx.jobs[job_uuid] job.exception = result job.exception_time = datetime.now(tz=UTC) - job.exception_stage = "_send_job_request" + job.exception_stage = "_trigger_job_execution" exceptions.append( ExceptionInfo( exception=job.exception, @@ -1804,13 +1850,12 @@ async def _score_job(ctx: BatchContext, job: Job) -> None: time_took_sec = job.time_took.total_seconds() - # TODO separate correctness check from scoring in job generator - job.correct, comment, score = job.job_generator.verify( + job.correct, comment = job.job_generator.verify_correctness( job.job_response, - time_took_sec, ) - if time_took_sec > job.job_generator.timeout_seconds(): + if job.job_generator.verify_time(time_took_sec) == False: # noqa ; it can be None, in which case we + # don't trigger the code below job.comment = f"took too long: {time_took_sec=:.2f}" logger.info("%s %s", job.name, job.comment) job.system_event( @@ -1822,7 +1867,7 @@ async def _score_job(ctx: BatchContext, job: Job) -> None: job.success = job.correct job.comment = comment - job.score = score + job.score = float(job.correct) if job.success: job.system_event( @@ -2265,12 +2310,18 @@ async def execute_synthetic_batch_run( await _multi_send_initial_job_request(ctx) executor_ready_jobs = await _get_executor_ready_jobs(ctx) + any_job_busy = any( job.decline_reason() == V0DeclineJobRequest.Reason.BUSY for job in ctx.jobs.values() ) + + await ctx.checkpoint_system_event("_multi_send_job_request_for_streaming") + await _multi_send_job_request_for_streaming(ctx, executor_ready_jobs) + if executor_ready_jobs: - await ctx.checkpoint_system_event("_multi_send_job_request") - await _multi_send_job_request(ctx, executor_ready_jobs) + await ctx.checkpoint_system_event("_trigger_job_execution") + + await _trigger_job_execution(ctx, executor_ready_jobs) # don't persist system events before this point, we want to minimize # any extra interactions which could slow down job processing before @@ -2309,7 +2360,7 @@ async def execute_synthetic_batch_run( await ctx.loop_profiler.close() except (Exception, asyncio.CancelledError) as exc: - logger.error("Synthetic jobs batch failure: %r", exc) + logger.error("Synthetic jobs batch failure: ", exc_info=exc) ctx.system_event( type=SystemEvent.EventType.VALIDATOR_FAILURE, subtype=SystemEvent.EventSubType.GENERIC_ERROR, diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py index 6b4624cbd..76b13f48c 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py @@ -1,4 +1,5 @@ import abc +import asyncio import uuid from compute_horde.base.docker import DockerRunOptionsPreset @@ -33,6 +34,23 @@ def docker_image_name(self) -> str: ... @abc.abstractmethod def docker_run_options_preset(self) -> DockerRunOptionsPreset: ... + async def streaming_preparation_timeout(self) -> float | None: + """For streaming jobs, the timeout between sending a JobRequest and receiving StreamingReadyRequest""" + return None + + async def trigger_streaming_job_execution( + self, + job_uuid, + start_barrier: asyncio.Barrier, + server_public_key, + client_key_pair, + server_address, + server_port, + ): + """For streaming jobs, perform whatever calls are necessary to trigger actions required to perform validation + of given executor class.""" + raise NotImplementedError + def docker_run_cmd(self) -> list[str]: return [] @@ -46,7 +64,12 @@ async def output_upload(self) -> OutputUpload | None: return None @abc.abstractmethod - def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: ... + def verify_time(self, time_took: float) -> bool | None: + """Check whether the job finished in time. Return None if no data available (for example it didn't finish)""" + + @abc.abstractmethod + def verify_correctness(self, msg: V0JobFinishedRequest) -> tuple[bool, str]: + """Check whether the job yielded the right result.""" @abc.abstractmethod def job_description(self) -> str: ... diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py index 933ef0529..300532cf0 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py @@ -103,25 +103,16 @@ def raw_script(self) -> str | None: def volume(self) -> Volume | None: return InlineVolume(contents=single_file_zip("payload.txt", self.hash_job.payload)) - def score(self, time_took: float) -> float: - if self.weights_version == 0: - return MAX_SCORE * (1 - (time_took / (2 * self.timeout_seconds()))) - elif self.weights_version in [1, 2]: - return 1 / time_took - elif self.weights_version in [3, 4]: - return 1 if time_took <= self.timeout_seconds() else 0 - else: - raise RuntimeError(f"No score function for weights_version: {self.weights_version}") + def verify_time(self, time_took: float) -> bool | None: + return time_took <= self.timeout_seconds() - def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: + def verify_correctness(self, msg: V0JobFinishedRequest) -> tuple[bool, str]: if str(msg.docker_process_stdout).strip() != str(self.expected_answer): return ( False, f"result does not match expected answer: {self.expected_answer}, msg: {msg.model_dump_json()}", - 0, ) - - return True, "", self.score(time_took) + return True, "" def job_description(self) -> str: return f"Hashcat {self.hash_job}" diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/llm_prompts.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/llm_prompts.py index 79d351639..ea4907474 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/llm_prompts.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/llm_prompts.py @@ -1,3 +1,7 @@ +import asyncio +import logging +import tempfile +import time import uuid import httpx @@ -15,8 +19,15 @@ get_public_url, ) +from ...dynamic_config import aget_config from .base import BaseSyntheticJobGenerator +logger = logging.getLogger(__name__) + + +STREAMING_PROCESSING_TIMEOUT = 30 +STREAMING_PROCESSING_TIMEOUT_LEEWAY = 5 + class LlmPromptsJobGenerator(BaseSyntheticJobGenerator): def __init__( @@ -37,6 +48,7 @@ def __init__( self.s3_output_bucket = settings.S3_BUCKET_NAME_ANSWERS self.prompt_answers: dict[str, str] = {} + self.streaming_processing_time: float | None = None def _url_for_upload(self) -> str: return generate_upload_url( @@ -104,11 +116,14 @@ async def download_answers(self, client: httpx.AsyncClient | None = None): response = await download_file_content(self.url_for_download(), client=client) self.prompt_answers = pydantic.TypeAdapter(dict[str, str]).validate_json(response) - def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: + def verify_time(self, time_took: float) -> bool | None: + return True + + def verify_correctness(self, msg: V0JobFinishedRequest) -> tuple[bool, str]: # just check if there are any answers if self.prompt_answers == {}: - return False, "no answers", 0.0 - return True, "answers exist", 1.0 + return False, "no answers" + return True, "answers exist" def job_description(self) -> str: return "LLM prompts job" @@ -116,6 +131,12 @@ def job_description(self) -> str: def volume_in_initial_req(self) -> bool: return True + async def streaming_preparation_timeout(self) -> float | None: + """For streaming jobs, the timeout between sending a JobRequest and receiving StreamingReadyRequest""" + # TODO: caching + val = await aget_config("DYNAMIC_SYNTHETIC_STREAMING_JOB_READY_TIMEOUT") + return float(val) if val is not None else None + class LlmPromptsSyntheticJobGenerator(LlmPromptsJobGenerator): def __init__( @@ -145,5 +166,90 @@ def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str return True, "", 1.0 + def verify_time(self, time_took: float) -> bool | None: + if not self.streaming: + return time_took <= self.timeout_seconds() + if self.streaming_processing_time is None: + return None + return self.streaming_processing_time <= STREAMING_PROCESSING_TIMEOUT + + def verify_correctness(self, msg: V0JobFinishedRequest) -> tuple[bool, str]: + for expected_prompt in self.expected_prompts: + if expected_prompt.content not in self.prompt_answers: + return False, "result does not contain all answers" + if expected_prompt.answer != self.prompt_answers[expected_prompt.content]: + return False, "results does not match expected answers" + + return True, "" + def job_description(self) -> str: return ("Streaming " if self.streaming else "") + "LLM prompts synthetic job" + + async def trigger_streaming_job_execution( + self, + job_uuid, + start_barrier: asyncio.Barrier, + server_public_key, + client_key_pair, + server_address, + server_port, + ): + with tempfile.NamedTemporaryFile(delete=True, mode="w+") as executor_cert_file: + executor_cert_file.write(server_public_key) + executor_cert_file.flush() + + logger.debug(f"Waiting for streaming start barrier for {job_uuid}") + await start_barrier.wait() + logger.debug(f"Passed streaming start barrier for {job_uuid}") + + url = f"https://{server_address}:{server_port}/execute-job" + logger.debug("About to send seed to %s (job_uuid=%s)", url, job_uuid) + + async with httpx.AsyncClient( + verify=executor_cert_file.name, + cert=client_key_pair, + timeout=STREAMING_PROCESSING_TIMEOUT + STREAMING_PROCESSING_TIMEOUT_LEEWAY, + ) as client: + # send the seed to the executor to start the streaming job + try: + t_before = time.time() + r = await client.post( + url, json={"seed": self.seed}, headers={"Host": server_address} + ) + except Exception as e: + msg = f"Failed to execute streaming job {job_uuid} on {url}: {e}" + logger.debug(msg) + else: + try: + r.raise_for_status() + self.streaming_processing_time = time.time() - t_before + except Exception as e: + msg = f"Failed to execute streaming job {job_uuid} on {url}: {e}, the response was: {r.content[:100]!r}" + logger.debug(msg) + else: + logger.debug( + "Successfully sent seed to %s (job_uuid=%s), the result is: %s", + url, + job_uuid, + r.content[:100], + ) + + url = f"https://{server_address}:{server_port}/terminate" + logger.debug("About to terminate (job_uuid=%s)", job_uuid) + try: + r = await client.get(url, headers={"Host": server_address}) + except Exception as e: + msg = f"Failed to terminate streaming job {job_uuid} on {url}: {e}" + logger.debug(msg) + else: + try: + r.raise_for_status() + except Exception as e: + msg = f"Failed to terminate streaming job {job_uuid} on {url}: {e}, the response was: {r.content[:100]!r}" + logger.debug(msg) + else: + logger.debug( + "Successfully terminated %s (job_uuid=%s)", + url, + job_uuid, + ) diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/helpers.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/helpers.py index 22b4b558c..832075b65 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/helpers.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/helpers.py @@ -1,3 +1,4 @@ +import re import uuid from collections.abc import Sequence from datetime import timedelta @@ -14,11 +15,15 @@ ) -async def check_synthetic_job(job_uuid: uuid.UUID, miner_id: int, status: str, score: float): +async def check_synthetic_job( + job_uuid: uuid.UUID, miner_id: int, status: str, score: float, comment: re.Pattern | None = None +): job = await SyntheticJob.objects.aget(job_uuid=job_uuid) assert job.miner_id == miner_id, f"{job.miner_id} != {miner_id}" assert job.status == status, f"{job.status} != {status}" assert job.score == score, f"{job.score} != {score}" + if comment: + assert comment.match(job.comment), f"{job.comment} does not match {comment}" async def check_miner_job_system_events( diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py index e5f4c1ea7..34f8724d1 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py @@ -27,6 +27,12 @@ class MockSyntheticJobGenerator(BaseSyntheticJobGenerator): + def verify_time(self, time_took: float) -> bool | None: + return True + + def verify_correctness(self, msg: V0JobFinishedRequest) -> tuple[bool, str]: + return True, "mock" + def __init__(self, _uuid: uuid.UUID, **kwargs): super().__init__(**kwargs) self._uuid = _uuid @@ -76,11 +82,16 @@ async def create(self, executor_class: ExecutorClass, **kwargs) -> BaseSynthetic class LlmPromptsSyntheticJobGeneratorFactory: def __init__( - self, uuids: list[uuid.UUID], prompt_samples: list[PromptSample], prompts: list[Prompt] + self, + uuids: list[uuid.UUID], + prompt_samples: list[PromptSample], + prompts: list[Prompt], + streaming: bool = False, ): self._uuids = uuids self._prompt_samples = prompt_samples self._prompts = prompts + self._streaming = streaming async def create( self, executor_class: ExecutorClass, *args, **kwargs @@ -90,7 +101,7 @@ async def create( expected_prompts=self._prompts, s3_url="mock", seed=0, - streaming=False, + streaming=self._streaming, ) generator._uuid = self._uuids.pop(0) return generator diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_batch.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_batch.py index 79479664a..8aed09c66 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_batch.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_batch.py @@ -229,9 +229,6 @@ async def check_scores( job_uuid = receipt["payload"]["job_uuid"] - time_took_us = receipt["payload"]["time_took_us"] - time_took = time_took_us / 1_000_000 - job = await SyntheticJob.objects.aget(job_uuid=job_uuid) - assert abs(job.score * time_took - expected_multiplier) < 0.0001 + assert abs(job.score - expected_multiplier) < 0.0001 diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_multiple_miners.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_multiple_miners.py index c3a6e727e..c2ebe57fc 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_multiple_miners.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_multiple_miners.py @@ -6,8 +6,10 @@ from unittest.mock import patch import bittensor +import httpx import pytest import pytest_asyncio +from compute_horde.certificate import generate_certificate_at from compute_horde.executor_class import ( DEFAULT_EXECUTOR_CLASS, DEFAULT_LLM_EXECUTOR_CLASS, @@ -40,8 +42,9 @@ ) from compute_horde_validator.validator.tests.transport import MinerSimulationTransport +from ...synthetic_jobs.generator import llm_prompts from .helpers import check_miner_job_system_events, check_synthetic_job, generate_prompts -from .mock_generator import MOCK_SCORE, NOT_SCORED, LlmPromptsSyntheticJobGeneratorFactory +from .mock_generator import NOT_SCORED, LlmPromptsSyntheticJobGeneratorFactory pytestmark = [ pytest.mark.asyncio, @@ -133,6 +136,11 @@ def _create(ctx: BatchContext, miner_hotkey: str): return _create +@pytest.fixture +def ssl_public_key(): + return generate_certificate_at()[1] + + async def test_all_succeed( axon_dict: dict[str, bittensor.AxonInfo], transports: list[MinerSimulationTransport], @@ -169,19 +177,11 @@ async def test_all_succeed( ) for job_uuid, miner in zip(job_uuids, miners): - await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.COMPLETED, MOCK_SCORE) + await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.COMPLETED, 1) -async def test_all_streaming_succeed( - axon_dict: dict[str, bittensor.AxonInfo], - transports: list[MinerSimulationTransport], - miners: list[Miner], - create_simulation_miner_client: Callable, - job_uuids: list[uuid.UUID], - streaming_manifest_message: str, - httpx_mock: HTTPXMock, - mocker: MockerFixture, - settings, +async def prep_mocks_for_streaming( + mocker: MockerFixture, httpx_mock: HTTPXMock, job_uuids: list[uuid.UUID], settings ): prompts, prompt_samples = await generate_prompts(num_miners=len(job_uuids)) mocker.patch( @@ -191,9 +191,14 @@ async def test_all_streaming_succeed( mocker.patch( "compute_horde_validator.validator.synthetic_jobs.generator.current.synthetic_job_generator_factory", LlmPromptsSyntheticJobGeneratorFactory( - uuids=job_uuids.copy(), prompt_samples=prompt_samples, prompts=prompts + uuids=job_uuids.copy(), + prompt_samples=prompt_samples, + prompts=prompts, + streaming=True, ), ) + mocker.patch.object(llm_prompts, "STREAMING_PROCESSING_TIMEOUT", 1) + mocker.patch.object(llm_prompts, "STREAMING_PROCESSING_TIMEOUT_LEEWAY", 0.5) httpx_mock.add_response( url=re.compile( @@ -201,9 +206,34 @@ async def test_all_streaming_succeed( ), json={p.content: p.answer for p in prompts}, ) + + async def sleepy_request(*_): + await asyncio.sleep(2) + return httpx.Response(201) + + httpx_mock.add_callback(sleepy_request, url=re.compile("https://127.0.0.1:8007.*")) + + +@pytest.mark.override_config( + DYNAMIC_SYNTHETIC_STREAMING_JOB_READY_TIMEOUT=0.5, +) +async def test_some_streaming_succeed( + axon_dict: dict[str, bittensor.AxonInfo], + transports: list[MinerSimulationTransport], + miners: list[Miner], + create_simulation_miner_client: Callable, + job_uuids: list[uuid.UUID], + streaming_manifest_message: str, + httpx_mock: HTTPXMock, + mocker: MockerFixture, + ssl_public_key: str, + settings, +): + await prep_mocks_for_streaming(mocker, httpx_mock, job_uuids, settings) # generator will solve to the right answer MOCK_SCORE = 1.0 + port = 8000 for job_uuid, transport in zip(job_uuids, transports): await transport.add_message(streaming_manifest_message, send_before=1) @@ -215,16 +245,20 @@ async def test_all_streaming_succeed( ).model_dump_json() await transport.add_message(executor_ready_message, send_before=0) - streaming_ready_message = miner_requests.V0StreamingJobReadyRequest( - job_uuid=str(job_uuid), public_key="123", ip="127.0.0.1", port=8000 - ).model_dump_json() - await transport.add_message(streaming_ready_message, send_before=0) + if job_uuid != job_uuids[-1]: + streaming_ready_message = miner_requests.V0StreamingJobReadyRequest( + job_uuid=str(job_uuid), + public_key=ssl_public_key, + ip="127.0.0.1", + port=(port := port + 1), + ).model_dump_json() + await transport.add_message(streaming_ready_message, send_before=0) - job_finish_message = miner_requests.V0JobFinishedRequest( - job_uuid=str(job_uuid), docker_process_stdout="", docker_process_stderr="" - ).model_dump_json() + job_finish_message = miner_requests.V0JobFinishedRequest( + job_uuid=str(job_uuid), docker_process_stdout="", docker_process_stderr="" + ).model_dump_json() - await transport.add_message(job_finish_message, send_before=2) + await transport.add_message(job_finish_message, send_before=2) await asyncio.wait_for( execute_synthetic_batch_run( @@ -233,11 +267,22 @@ async def test_all_streaming_succeed( [], create_miner_client=create_simulation_miner_client, ), - timeout=1, + timeout=10, ) for job_uuid, miner in zip(job_uuids, miners): - await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.COMPLETED, MOCK_SCORE) + if job_uuid == job_uuids[-1]: + await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.FAILED, 0) + elif job_uuid == job_uuids[-2]: + await check_synthetic_job( + job_uuid, + miner.pk, + SyntheticJob.Status.FAILED, + 0, + re.compile("took too long: time_took_sec=.*"), + ) + else: + await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.COMPLETED, MOCK_SCORE) @pytest_asyncio.fixture @@ -495,7 +540,7 @@ async def test_complex( == 8 ) - await check_synthetic_job(job_uuids[0], miners[0].pk, SyntheticJob.Status.COMPLETED, MOCK_SCORE) + await check_synthetic_job(job_uuids[0], miners[0].pk, SyntheticJob.Status.COMPLETED, 1) await check_miner_job_system_events( [ ( diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_single_miner.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_single_miner.py index 57fcc0cfc..616143469 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_single_miner.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_single_miner.py @@ -18,7 +18,6 @@ from ..helpers import check_system_events from .helpers import check_synthetic_job from .mock_generator import ( - MOCK_SCORE, NOT_SCORED, ) @@ -54,7 +53,7 @@ async def test_execute_miner_synthetic_jobs_success( timeout=1, ) - await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.COMPLETED, MOCK_SCORE) + await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.COMPLETED, 1) await sync_to_async(check_system_events)( SystemEvent.EventType.MINER_SYNTHETIC_JOB_SUCCESS, SystemEvent.EventSubType.SUCCESS ) diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_utils.py b/validator/app/src/compute_horde_validator/validator/tests/test_utils.py index 9ba3a36a6..029412cf2 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_utils.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_utils.py @@ -20,9 +20,6 @@ V0JobFailedRequest, V0JobFinishedRequest, ) -from compute_horde.mv_protocol.validator_requests import ( - V0JobFinishedReceiptRequest, -) from django.utils.timezone import now from compute_horde_validator.validator.models import ( @@ -49,7 +46,7 @@ check_system_events, ) -MOCK_SCORE = 0.8 +MOCK_SCORE = 1 MANIFEST_INCENTIVE_MULTIPLIER = 1.05 MANIFEST_DANCE_RATIO_THRESHOLD = 1.4 MANIFEST_INCENTIVE_APPLIED_SCORE = MOCK_SCORE * MANIFEST_INCENTIVE_MULTIPLIER @@ -59,6 +56,12 @@ class MockSyntheticJobGenerator(BaseSyntheticJobGenerator): + def verify_time(self, time_took: float) -> bool | None: + return True + + def verify_correctness(self, msg: V0JobFinishedRequest) -> tuple[bool, str]: + return True, "mock" + async def ainit(self, miner_hotkey: str): return @@ -80,9 +83,6 @@ def docker_run_cmd(self) -> list[str]: async def volume(self) -> Volume | None: return InlineVolume(contents="mock") - def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: - return True, "mock", MOCK_SCORE - def job_description(self) -> str: return "mock" @@ -468,13 +468,8 @@ async def interaction_callback(miner_client, after_job_sent): interaction_callback, ) - miner_client = mocked_synthetic_miner_client.instance async for job in SyntheticJob.objects.filter(job_uuid__in=job_uuids): - receipt = miner_client._query_sent_models( - lambda m, j=job: m.payload.job_uuid == str(j.job_uuid), V0JobFinishedReceiptRequest - )[0] - time_took = receipt.payload.time_took_us / 1_000_000 - assert abs(job.score * time_took - expected_multiplier) < 0.0001 + assert abs(job.score - expected_multiplier) < 0.0001 @patch( @@ -556,13 +551,8 @@ async def interaction_callback(miner_client, after_job_sent): miner_hotkey="miner_hotkey", ) - miner_client = mocked_synthetic_miner_client.instance for job in SyntheticJob.objects.filter(job_uuid__in=job_uuids): - receipt = miner_client._query_sent_models( - lambda m, j=job: m.payload.job_uuid == str(j.job_uuid), V0JobFinishedReceiptRequest - )[0] - time_took = receipt.payload.time_took_us / 1_000_000 - assert abs(job.score * time_took - expected_multiplier) < 0.0001 + assert abs(job.score - expected_multiplier) < 0.0001 mocked_metagraph_1 = MagicMock(side_effect=[ValueError, TypeError, MockMetagraph()])