Skip to content

Commit

Permalink
Small fixes from local branch
Browse files Browse the repository at this point in the history
  • Loading branch information
JackUrb committed Aug 21, 2023
1 parent 72e1deb commit 3ebab62
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 11 deletions.
5 changes: 4 additions & 1 deletion mephisto/abstractions/architects/ec2/ec2_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,10 @@ def try_server_push(subprocess_args: List[str], retries=5, sleep_time=10.0):
except subprocess.CalledProcessError:
retries -= 1
sleep_time *= 1.5
logger.info(f"Timed out trying to push to server. Retries remaining: {retries}")
logger.info(
f"Timed out trying to push to server. CMD: {subprocess_args}. "
f"Retries remaining: {retries}"
)
time.sleep(sleep_time)
raise Exception("Could not successfully push to the ec2 instance. See log for errors.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ class RemoteProcedureBlueprintArgs(
"required": True,
},
)
extra_source_dir: str = field(
default=MISSING,
metadata={
"help": (
"Optional path to sources that the HTML may "
"refer to (such as images/video/css/scripts)"
),
},
)
link_task_source: bool = field(
default=False,
metadata={
Expand Down
4 changes: 2 additions & 2 deletions mephisto/data_model/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def _raise_if_disconnected():
if not self.did_submit.is_set():
# If released without submit, raise timeout
raise AgentTimeoutError(timeout, self.db_id)
# Check disconnect possiblities again
# Check disconnect possibilities again
_raise_if_disconnected()
return self.did_submit.is_set()

Expand Down Expand Up @@ -456,7 +456,7 @@ def get_data_dir(self) -> str:
this agent into
"""
assignment_dir = self.get_assignment().get_data_dir()
return os.path.join(assignment_dir, self.db_id)
return os.path.join(assignment_dir, str(self.db_id))

def update_status(self, new_status: str) -> None:
"""Update the database status of this agent, and
Expand Down
10 changes: 6 additions & 4 deletions mephisto/operations/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,10 @@ def cleanup_runs():
if not shutdown_thread.is_alive():
# Only join if the shutdown fully completed
shutdown_thread.join()
if self._event_loop.is_running():
self._event_loop.stop()
self._event_loop.run_until_complete(self.shutdown_async())
if not self._event_loop.is_running():
self._event_loop.run_until_complete(self.shutdown_async())
else:
asyncio.ensure_future(self.shutdown_async(), loop=self._event_loop)

async def shutdown_async(self):
"""Shut down the asyncio parts of the Operator"""
Expand Down Expand Up @@ -517,7 +518,8 @@ async def _stop_loop_when_no_running_tasks(self, log_rate: Optional[int] = None)
last_log = time.time()
self.print_run_details()
await asyncio.sleep(RUN_STATUS_POLL_TIME)
self._event_loop.stop()
if not self.is_shutdown:
self.shutdown()

def _run_loop_until(self, condition_met: Callable[[], bool], timeout) -> bool:
"""
Expand Down
11 changes: 7 additions & 4 deletions mephisto/tools/examine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ def get_worker_stats(units: List["Unit"]) -> Dict[str, Dict[str, List["Unit"]]]:
"soft_rejected": [],
"rejected": [],
}
previous_work_by_worker[w_id][unit.get_status()].append(unit)
status = unit.get_status()
if status not in previous_work_by_worker[w_id]:
continue
previous_work_by_worker[w_id][status].append(unit)
return previous_work_by_worker


Expand Down Expand Up @@ -250,13 +253,13 @@ def run_examine_by_worker(
should_special_qualify = input(
"Do you want to approve qualify this worker? (y)es/(n)o: "
)
if should_special_qualify.lower() in ["y", "yes"]:
if should_special_qualify.lower().startswith("y"):
worker.grant_qualification(approve_qualification, 1)
elif decision.lower() == "p":
agent.soft_reject_work()
if apply_all_decision is None and block_qualification is not None:
should_soft_block = input("Do you want to soft block this worker? (y)es/(n)o: ")
if should_soft_block.lower() in ["y", "yes"]:
if should_soft_block.lower().startswith("y"):
worker.grant_qualification(block_qualification, 1)
elif decision.lower() == "v":
# Same as "a", except we can specify exact qualification value being assigned
Expand All @@ -271,7 +274,7 @@ def run_examine_by_worker(
if apply_all_decision is None:
reason = input("Why are you rejecting this work? ")
should_block = input("Do you want to hard block this worker? (y)es/(n)o: ")
if should_block.lower() in ["y", "yes"]:
if should_block.lower().startswith("y"):
block_reason = input("Why permanently block this worker? ")
worker.block_worker(block_reason, unit=unit)
agent.reject_work(reason)
Expand Down

0 comments on commit 3ebab62

Please sign in to comment.