Skip to content

Commit

Permalink
Attempt to patch #2139.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gamenot committed Jan 22, 2024
1 parent f2e292a commit 0930736
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 59 deletions.
85 changes: 53 additions & 32 deletions smarts/core/sumo_traffic_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def __init__(
self._sim = None
self._handling_error = False
self._traci_retries = traci_retries
# XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139)
self._foreign_traci_servers: List[TraciConn] = []

# start with the default recovery flags...
self._recovery_flags = super().recovery_flags
Expand Down Expand Up @@ -209,18 +211,25 @@ def _initialize_traci_conn(self, num_retries=5):
)

try:
while not self._traci_conn.connected:
try:
self._traci_conn.connect(
timeout=5,
minimum_traci_version=20,
minimum_sumo_version=(1, 10, 0),
debug=self._debug,
)
except traci.exceptions.FatalTraCIError:
# Could not connect in time just retry connection
pass
self._traci_conn.connect(
timeout=5,
minimum_traci_version=20,
minimum_sumo_version=(1, 10, 0),
debug=self._debug,
)

if not self._traci_conn.connected:
# Save the connection to try to avoid closing it for the other client.
self._foreign_traci_servers.append(self._traci_conn)
self._traci_conn = None
raise traci.exceptions.TraCIException(
"TraCI server was likely taken by other client."
)

except traci.exceptions.FatalTraCIError:
# Could not connect in time just retry connection
current_retries += 1
continue
except traci.exceptions.TraCIException:
# SUMO process died... unsure why this is not a fatal traci error
current_retries += 1
Expand All @@ -230,7 +239,7 @@ def _initialize_traci_conn(self, num_retries=5):
continue
except KeyboardInterrupt:
self._log.debug("Keyboard interrupted TraCI connection.")
self._traci_conn.close_traci_and_pipes()
self._traci_conn.close_traci_and_pipes(wait=False)
raise
break
else:
Expand All @@ -257,7 +266,7 @@ def _base_sumo_load_params(self):
"--net-file=%s" % self._scenario.road_map.source,
"--quit-on-end",
"--log=%s" % self._log_file,
"--error-log=%s" % self._log_file,
"--error-log=%s.err" % self._log_file,
"--no-step-log",
"--no-warnings=1",
"--seed=%s" % random.randint(0, 2147483648),
Expand Down Expand Up @@ -293,6 +302,13 @@ def _base_sumo_load_params(self):

return load_params

def _restart_sumo(self):
engine_config = config()
traci_retries = self._traci_retries or engine_config(
"sumo", "traci_retries", default=5, cast=int
)
self._initialize_traci_conn(num_retries=traci_retries)

def setup(self, scenario) -> ProviderState:
"""Initialize the simulation with a new scenario."""
self._log.debug("Setting up SumoTrafficSim %s", self)
Expand All @@ -316,24 +332,19 @@ def setup(self, scenario) -> ProviderState:
), "SumoTrafficSimulation requires a SumoRoadNetwork"
self._log_file = scenario.unique_sumo_log_file()

if restart_sumo:
try:
engine_config = config()
traci_retries = self._traci_retries or engine_config(
"sumo", "traci_retries", default=5, cast=int
)
self._initialize_traci_conn(num_retries=traci_retries)
except traci.exceptions.FatalTraCIError:
return ProviderState()
elif self._allow_reload:
assert (
self._traci_conn is not None
), "TraCI should be connected at this point."
try:
self._traci_conn.load(self._base_sumo_load_params())
except traci.exceptions.FatalTraCIError as err:
self._handle_traci_exception(err, actors_relinquishable=False)
return ProviderState()
try:
if restart_sumo:
self._restart_sumo()
elif self._allow_reload:
assert (
self._traci_conn is not None
), "TraCI should be connected at this point."
try:
self._traci_conn.load(self._base_sumo_load_params())
except traci.exceptions.FatalTraCIError:
self._restart_sumo()
except traci.exceptions.FatalTraCIError:
return ProviderState()

assert self._traci_conn is not None, "No active TraCI connection"

Expand Down Expand Up @@ -430,9 +441,19 @@ def teardown(self):
self._remove_vehicles()
except traci.exceptions.FatalTraCIError:
pass
if self._traci_conn is not None:
if not self._allow_reload and self._traci_conn is not None:
self._traci_conn.close_traci_and_pipes()

for i, trc in reversed(
[
(j, trc)
for j, trc in enumerate(self._foreign_traci_servers)
if not trc.viable
]
):
self._foreign_traci_servers.pop(i)
trc.close_traci_and_pipes(wait=False)

self._cumulative_sim_seconds = 0
self._non_sumo_vehicle_ids = set()
self._sumo_vehicle_ids = set()
Expand Down
38 changes: 11 additions & 27 deletions smarts/core/utils/sumo.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ def __init__(
self._sumo_version: Tuple[int, ...] = tuple()
self._host = host
self._name = name
# self._log = logging
self._log = logging.Logger(self.__class__.__name__)
# self._log.setLevel(logging.ERROR)
self._connected = False

if sumo_port is None:
Expand All @@ -124,7 +122,7 @@ def __init__(
def __del__(self) -> None:
# We should not raise in delete.
try:
self.close_traci_and_pipes()
self.close_traci_and_pipes(wait=False)
except Exception:
pass

Expand Down Expand Up @@ -171,7 +169,7 @@ def connect(
self._sumo_port,
err,
)
self.close_traci_and_pipes(kill=True)
self.close_traci_and_pipes()
raise
except ConnectionRefusedError:
self._log.error(
Expand All @@ -180,7 +178,7 @@ def connect(
self._host,
self._sumo_port,
)
self.close_traci_and_pipes(kill=True)
self.close_traci_and_pipes()
raise
self._connected = True
self._traci_conn = traci_conn
Expand All @@ -205,7 +203,7 @@ def connect(
)
# XXX: the error type is changed to TraCIException to make it consistent with the
# process died case of `traci.connect`. Since TraCIException is fatal just in this case...
self.close_traci_and_pipes(kill=True)
self.close_traci_and_pipes()
raise traci.exceptions.TraCIException(err)
except OSError as err:
self._log.error(
Expand All @@ -215,7 +213,7 @@ def connect(
self._sumo_port,
err,
)
self.close_traci_and_pipes(kill=True)
self.close_traci_and_pipes()
raise traci.exceptions.TraCIException(err)
except ValueError:
self.close_traci_and_pipes()
Expand Down Expand Up @@ -272,11 +270,8 @@ def must_reset(self):
"""If the version of sumo will have errors if just reloading such that it must be reset."""
return self._sumo_version > (1, 12, 0)

def close_traci_and_pipes(self, wait: Optional[float] = 0, kill: bool = False):
def close_traci_and_pipes(self, wait: bool = True, kill: bool = True):
"""Safely closes all connections. We should expect this method to always work without throwing"""
assert wait is None or isinstance(wait, (int, float))
if isinstance(wait, (int, float)):
wait = max(0.0, wait)

def __safe_close(conn, **kwargs):
try:
Expand All @@ -295,27 +290,16 @@ def __safe_close(conn, **kwargs):

if self._connected:
self._log.debug("Closing TraCI connection to %s", self._sumo_port)
__safe_close(self._traci_conn, wait=bool(wait))
__safe_close(self._traci_conn, wait=wait)

if self._sumo_proc:
__safe_close(self._sumo_proc.stdin)
__safe_close(self._sumo_proc.stdout)
__safe_close(self._sumo_proc.stderr)
if wait:
try:
self._sumo_proc.wait(timeout=wait)
except subprocess.TimeoutExpired as err:
kill = True
self._log.error(
"TraCI server process shutdown timed out '%s:%s' [%s]",
self._host,
self._sumo_port,
err,
)
if kill:
self._sumo_proc.kill()
self._sumo_proc = None
self._log.error(
self._log.info(
"Killed TraCI server process '%s:%s", self._host, self._sumo_port
)

Expand All @@ -342,7 +326,7 @@ def _wrap_traci_method(
err,
)
# TraCI cannot continue
traci_conn.close_traci_and_pipes(kill=True)
traci_conn.close_traci_and_pipes()
raise traci.exceptions.FatalTraCIError("TraCI died.") from err
except OSError as err:
logging.error(
Expand All @@ -353,11 +337,11 @@ def _wrap_traci_method(
attribute_name,
err,
)
traci_conn.close_traci_and_pipes(kill=True)
traci_conn.close_traci_and_pipes()
raise OSError("Connection dropped.") from err
except traci.exceptions.TraCIException as err:
# Case where TraCI/SUMO can theoretically continue
raise traci.exceptions.TraCIException("TraCI can continue.") from err
except KeyboardInterrupt:
traci_conn.close_traci_and_pipes(kill=True)
traci_conn.close_traci_and_pipes(wait=False)
raise

0 comments on commit 0930736

Please sign in to comment.