Skip to content

Commit

Permalink
Merge pull request #390 from backend-developers-ltd/fix-stearming-syn…
Browse files Browse the repository at this point in the history
…thetic

Fix stearming synthetic
  • Loading branch information
mpnowacki-reef authored Jan 31, 2025
2 parents 940f094 + 9203142 commit cf8a911
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 162 deletions.
4 changes: 3 additions & 1 deletion compute_horde/compute_horde/miner_client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ async def read_messages(self):
try:
msg = self.accepted_request_type().parse(msg)
except ValidationError as ex:
error_msg = f"Malformed message from miner {self.miner_name}: {str(ex)}"
if msg == "PING":
continue
error_msg = f"Malformed message {msg} from miner {self.miner_name}: {str(ex)}"
logger.info(error_msg)
self.deferred_send_model(self.build_outgoing_generic_error(error_msg))
continue
Expand Down
10 changes: 10 additions & 0 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,16 @@ def wrapped(*args, **kwargs):
"level": "WARNING",
"propagate": True,
},
"botocore": {
"handlers": ["console"],
"level": "WARNING",
"propagate": True,
},
"httpcore.http11": {
"handlers": ["console"],
"level": "WARNING",
"propagate": True,
},
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
BaseSyntheticJobGenerator,
)
from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import (
LlmPromptsJobGenerator,
LlmPromptsSyntheticJobGenerator,
)
from compute_horde_validator.validator.synthetic_jobs.scoring import get_manifest_multiplier
Expand Down Expand Up @@ -1320,7 +1319,6 @@ async def _handle_job_accepted(client: MinerClient, ctx: BatchContext, job: Job)
async def _send_job_request(
ctx: BatchContext,
start_barrier: asyncio.Barrier,
streaming_start_barrier: asyncio.Barrier | None,
job_uuid: str,
) -> None:
await start_barrier.wait()
Expand All @@ -1343,20 +1341,95 @@ async def _send_job_request(
request_json = request.model_dump_json()

timeout = job.job_generator.timeout_seconds() + _JOB_RESPONSE_EXTRA_TIMEOUT

async with asyncio.timeout(timeout):
# send can block, so take a timestamp
# on both sides to detect long send times
job.job_before_sent_time = datetime.now(tz=UTC)
await client.send_check(request_json)
job.job_after_sent_time = datetime.now(tz=UTC)

# if streaming job
if streaming_start_barrier is not None:
await _trigger_streaming_job(ctx, streaming_start_barrier, job_uuid)

await job.job_response_event.wait()


async def _send_job_request_for_streaming(
ctx: BatchContext,
job_uuid: str,
):
job = ctx.jobs[job_uuid]
client = ctx.clients[job.miner_hotkey]

request = V0JobRequest(
job_uuid=job.uuid,
executor_class=job.executor_class,
docker_image_name=job.job_generator.docker_image_name(),
docker_run_options_preset=job.job_generator.docker_run_options_preset(),
docker_run_cmd=job.job_generator.docker_run_cmd(),
raw_script=job.job_generator.raw_script(),
volume=job.volume if not job.job_generator.volume_in_initial_req() else None,
output_upload=job.output_upload,
)
request_json = request.model_dump_json()

job.job_before_sent_time = datetime.now(tz=UTC)
await client.send_check(request_json)
job.job_after_sent_time = datetime.now(tz=UTC)

try:
async with asyncio.timeout(await job.job_generator.streaming_preparation_timeout()):
await job.streaming_job_ready_response_event.wait()
except TimeoutError:
logger.debug("Timeout waiting for StreamingJobReadyRequest for job_uuid=%s", job_uuid)
return

logger.debug(f"Received streaming job ready response for {job_uuid}")

response = job.streaming_job_ready_response
if not isinstance(response, V0StreamingJobReadyRequest):
logger.warning(f"Bad job ready response for {job_uuid}: {response}")


async def _multi_send_job_request_for_streaming(
ctx: BatchContext,
executor_ready_jobs: list[tuple[str, bool]],
):
streaming_jobs = [job_uuid for job_uuid, is_streaming in executor_ready_jobs if is_streaming]
if not streaming_jobs:
logger.debug("No streaming jobs to run")
return
logger.debug("Sending job request for %s streaming jobs", len(streaming_jobs))

tasks = [
asyncio.create_task(
_send_job_request_for_streaming(ctx, job_uuid),
name=f"{job_uuid}._send_job_request_for_streaming",
)
for job_uuid in streaming_jobs
]

results = await asyncio.gather(*tasks, return_exceptions=True)

exceptions: list[ExceptionInfo] = []
for i, result in enumerate(results):
if isinstance(result, BaseException):
job_uuid = streaming_jobs[i][0]
job = ctx.jobs[job_uuid]
job.exception = result
job.exception_time = datetime.now(tz=UTC)
job.exception_stage = "_send_job_request_for_streaming"
exceptions.append(
ExceptionInfo(
exception=job.exception,
miner_hotkey=job.miner_hotkey,
job_uuid=job.uuid,
stage=job.exception_stage,
)
)
else:
assert result is None
_handle_exceptions(ctx, exceptions)


async def _send_job_finished_receipts(ctx: BatchContext) -> None:
for job in ctx.jobs.values():
# generate job finished receipts for all jobs
Expand Down Expand Up @@ -1583,63 +1656,6 @@ async def _multi_send_initial_job_request(ctx: BatchContext) -> None:
_handle_exceptions(ctx, exceptions)


async def _trigger_streaming_job(
ctx: BatchContext, streaming_start_barrier: asyncio.Barrier, job_uuid: str
) -> None:
job = ctx.jobs[job_uuid]
response = None
try:
timeout = await aget_config("DYNAMIC_SYNTHETIC_STREAMING_JOB_READY_TIMEOUT")
async with asyncio.timeout(timeout):
await job.streaming_job_ready_response_event.wait()
logger.debug(f"Received streaming job ready response for {job_uuid}")

response = job.streaming_job_ready_response
if not isinstance(response, V0StreamingJobReadyRequest):
return

# Save job certificate received from executor
executor_cert_path = ctx.certs_basepath / "ssl" / f"executor_certificate_{job_uuid}.pem"
executor_cert_path.write_text(response.public_key)

# provide the synthetic job prompt seed to the streaming job
if isinstance(job.job_generator, LlmPromptsJobGenerator):
seed = job.job_generator.seed
else:
logger.error(f"Bad streaming job generator type: {job.job_generator}")
return

finally:
# !!! it's very important we wait on this barrier, no matter what happens above,
# if we don't wait, other concurrent jobs will hang forever since they will
# never pass this barrier
await streaming_start_barrier.wait()

if not isinstance(response, V0StreamingJobReadyRequest):
return

async with httpx.AsyncClient(
verify=str(executor_cert_path),
cert=ctx.own_certs,
timeout=job.job_generator.timeout_seconds(),
) as client:
# send the seed to the executor to start the streaming job
url = f"https://{response.ip}:{response.port}/execute-job"
try:
r = await client.post(url, json={"seed": seed}, headers={"Host": response.ip})
r.raise_for_status()
except Exception as e:
msg = f"Failed to execute streaming job {job_uuid} on {url}: {e}"
logger.warning(msg)
raise Exception(msg)
finally:
# schedule the job to terminate
url = f"https://{response.ip}:{response.port}/terminate"
r = await client.get(url, headers={"Host": response.ip})
if r.status_code != 200:
logger.warning(f"Failed to terminate streaming job {job_uuid} on {url}")


async def _get_executor_ready_jobs(ctx: BatchContext) -> list[tuple[str, bool]]:
streaming_classes = await get_streaming_job_executor_classes()

Expand All @@ -1656,42 +1672,72 @@ async def _get_executor_ready_jobs(ctx: BatchContext) -> list[tuple[str, bool]]:
return executor_ready_jobs


async def _multi_send_job_request(
async def _trigger_job_execution(
ctx: BatchContext, executor_ready_jobs: list[tuple[str, bool]]
) -> None:
assert executor_ready_jobs
logger.info("Sending job requests for %d ready jobs", len(executor_ready_jobs))
buffered_tasks = []
streaming_tasks = []

start_barrier = asyncio.Barrier(
len(
[
(job_uuid, is_streaming)
for job_uuid, is_streaming in executor_ready_jobs
if not is_streaming
or isinstance(
ctx.jobs[job_uuid].streaming_job_ready_response, V0StreamingJobReadyRequest
)
]
)
)

start_barrier = asyncio.Barrier(len(executor_ready_jobs))
for job_uuid, is_streaming in executor_ready_jobs:
if not is_streaming:
buffered_tasks.append(
asyncio.create_task(
_send_job_request(ctx, start_barrier, job_uuid),
name=f"{job_uuid}._trigger_job_execution",
)
)
elif isinstance(
streaming_response := ctx.jobs[job_uuid].streaming_job_ready_response,
V0StreamingJobReadyRequest,
):
streaming_tasks.append(
asyncio.create_task(
ctx.jobs[job_uuid].job_generator.trigger_streaming_job_execution(
job_uuid,
start_barrier,
streaming_response.public_key,
ctx.own_certs,
streaming_response.ip,
streaming_response.port,
),
name=f"{job_uuid}._trigger_job_execution",
)
)
else:
logger.debug(f"Not triggering execution for streaming job {job_uuid}")

num_streaming_jobs = len(
[job_uuid for job_uuid, is_streaming in executor_ready_jobs if is_streaming]
logger.info(
"Sending job requests for %d ready jobs - %s streaming and %s buffered",
len(streaming_tasks) + len(buffered_tasks),
len(streaming_tasks),
len(buffered_tasks),
)
if num_streaming_jobs > 0:
streaming_start_barrier = asyncio.Barrier(num_streaming_jobs)
else:
streaming_start_barrier = None

tasks = [
asyncio.create_task(
_send_job_request(
ctx, start_barrier, streaming_start_barrier if is_streaming else None, job_uuid
),
name=f"{job_uuid}._send_job_request",
)
for job_uuid, is_streaming in executor_ready_jobs
]

results = await asyncio.gather(*tasks, return_exceptions=True)
results = await asyncio.gather(*(streaming_tasks + buffered_tasks), return_exceptions=True)

exceptions: list[ExceptionInfo] = []
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.exception("When running Synthetic jobs", exc_info=result)
job_uuid = executor_ready_jobs[i][0]
job = ctx.jobs[job_uuid]
job.exception = result
job.exception_time = datetime.now(tz=UTC)
job.exception_stage = "_send_job_request"
job.exception_stage = "_trigger_job_execution"
exceptions.append(
ExceptionInfo(
exception=job.exception,
Expand Down Expand Up @@ -1804,13 +1850,12 @@ async def _score_job(ctx: BatchContext, job: Job) -> None:

time_took_sec = job.time_took.total_seconds()

# TODO separate correctness check from scoring in job generator
job.correct, comment, score = job.job_generator.verify(
job.correct, comment = job.job_generator.verify_correctness(
job.job_response,
time_took_sec,
)

if time_took_sec > job.job_generator.timeout_seconds():
if job.job_generator.verify_time(time_took_sec) == False: # noqa ; it can be None, in which case we
# don't trigger the code below
job.comment = f"took too long: {time_took_sec=:.2f}"
logger.info("%s %s", job.name, job.comment)
job.system_event(
Expand All @@ -1822,7 +1867,7 @@ async def _score_job(ctx: BatchContext, job: Job) -> None:

job.success = job.correct
job.comment = comment
job.score = score
job.score = float(job.correct)

if job.success:
job.system_event(
Expand Down Expand Up @@ -2265,12 +2310,18 @@ async def execute_synthetic_batch_run(
await _multi_send_initial_job_request(ctx)

executor_ready_jobs = await _get_executor_ready_jobs(ctx)

any_job_busy = any(
job.decline_reason() == V0DeclineJobRequest.Reason.BUSY for job in ctx.jobs.values()
)

await ctx.checkpoint_system_event("_multi_send_job_request_for_streaming")
await _multi_send_job_request_for_streaming(ctx, executor_ready_jobs)

if executor_ready_jobs:
await ctx.checkpoint_system_event("_multi_send_job_request")
await _multi_send_job_request(ctx, executor_ready_jobs)
await ctx.checkpoint_system_event("_trigger_job_execution")

await _trigger_job_execution(ctx, executor_ready_jobs)

# don't persist system events before this point, we want to minimize
# any extra interactions which could slow down job processing before
Expand Down Expand Up @@ -2309,7 +2360,7 @@ async def execute_synthetic_batch_run(
await ctx.loop_profiler.close()

except (Exception, asyncio.CancelledError) as exc:
logger.error("Synthetic jobs batch failure: %r", exc)
logger.error("Synthetic jobs batch failure: ", exc_info=exc)
ctx.system_event(
type=SystemEvent.EventType.VALIDATOR_FAILURE,
subtype=SystemEvent.EventSubType.GENERIC_ERROR,
Expand Down
Loading

0 comments on commit cf8a911

Please sign in to comment.