Skip to content

Commit

Permalink
Merge pull request #89 from g-votte/async-tb-fix2
Browse files Browse the repository at this point in the history
Fix the hang in `train_agent_async` with Tensorboard
  • Loading branch information
muupan authored Dec 14, 2020
2 parents 7026ca8 + 55e35e1 commit 322fa45
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
32 changes: 25 additions & 7 deletions pfrl/experiments/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ def record_tb_stats(summary_writer, agent_stats, eval_stats, env_stats, t):
summary_writer.flush()


def record_tb_stats_loop(outdir, queue, stop_event):
tb_writer = create_tb_writer(outdir)

while not (stop_event.wait(1e-6) and queue.empty()):
if not queue.empty():
agent_stats, eval_stats, env_stats, t = queue.get()
record_tb_stats(tb_writer, agent_stats, eval_stats, env_stats, t)


def save_agent(agent, t, outdir, logger, suffix=""):
dirname = os.path.join(outdir, "{}{}".format(t, suffix))
agent.save(dirname)
Expand Down Expand Up @@ -476,7 +485,6 @@ class AsyncEvaluator(object):
save_best_so_far_agent (bool): If set to True, after each evaluation,
if the score (= mean return of evaluation episodes) exceeds
the best-so-far score, the current agent is saved.
use_tensorboard (bool): Additionally log eval stats to tensorboard
"""

def __init__(
Expand All @@ -489,7 +497,6 @@ def __init__(
step_offset=0,
save_best_so_far_agent=True,
logger=None,
use_tensorboard=False,
):
assert (n_steps is None) != (n_episodes is None), (
"One of n_steps or n_episodes must be None. "
Expand All @@ -501,7 +508,6 @@ def __init__(
self.n_episodes = n_episodes
self.eval_interval = eval_interval
self.outdir = outdir
self.use_tensorboard = use_tensorboard
self.max_episode_len = max_episode_len
self.step_offset = step_offset
self.save_best_so_far_agent = save_best_so_far_agent
Expand All @@ -518,8 +524,8 @@ def __init__(
with open(os.path.join(self.outdir, "scores.txt"), "a"):
pass

if use_tensorboard:
self.tb_writer = create_tb_writer(outdir)
self.record_tb_stats_queue = None
self.record_tb_stats_thread = None

@property
def max_score(self):
Expand Down Expand Up @@ -563,8 +569,8 @@ def evaluate_and_update_max_score(self, t, episodes, env, agent):
)
record_stats(self.outdir, values)

if self.use_tensorboard:
record_tb_stats(self.tb_writer, agent_stats, eval_stats, env_stats, t)
if self.record_tb_stats_queue is not None:
self.record_tb_stats_queue.put([agent_stats, eval_stats, env_stats, t])

with self._max_score.get_lock():
if mean > self._max_score.value:
Expand All @@ -589,3 +595,15 @@ def evaluate_if_necessary(self, t, episodes, env, agent):
self.wrote_header.value = True
return self.evaluate_and_update_max_score(t, episodes, env, agent)
return None

def start_tensorboard_writer(self, outdir, stop_event):
self.record_tb_stats_queue = mp.Queue()
self.record_tb_stats_thread = pfrl.utils.StoppableThread(
target=record_tb_stats_loop,
args=[outdir, self.record_tb_stats_queue, stop_event],
stop_event=stop_event,
)
self.record_tb_stats_thread.start()

def join_tensorboard_writer(self):
self.record_tb_stats_thread.join()
6 changes: 5 additions & 1 deletion pfrl/experiments/train_agent_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,10 @@ def train_agent_async(
max_episode_len=max_episode_len,
step_offset=step_offset,
save_best_so_far_agent=save_best_so_far_agent,
use_tensorboard=use_tensorboard,
logger=logger,
)
if use_tensorboard:
evaluator.start_tensorboard_writer(outdir, stop_event)

if random_seeds is None:
random_seeds = np.arange(processes)
Expand Down Expand Up @@ -312,4 +313,7 @@ def f():

stop_event.set()

if evaluator is not None and use_tensorboard:
evaluator.join_tensorboard_writer()

return agent

0 comments on commit 322fa45

Please sign in to comment.