From 0930736ce8904438947603ec148009d8218b84f9 Mon Sep 17 00:00:00 2001 From: Tucker Date: Mon, 22 Jan 2024 14:01:41 -0500 Subject: [PATCH] Attempt to patch #2139. --- smarts/core/sumo_traffic_simulation.py | 85 ++++++++++++++++---------- smarts/core/utils/sumo.py | 38 ++++-------- 2 files changed, 64 insertions(+), 59 deletions(-) diff --git a/smarts/core/sumo_traffic_simulation.py b/smarts/core/sumo_traffic_simulation.py index 76cf6921a6..7ff882267b 100644 --- a/smarts/core/sumo_traffic_simulation.py +++ b/smarts/core/sumo_traffic_simulation.py @@ -125,6 +125,8 @@ def __init__( self._sim = None self._handling_error = False self._traci_retries = traci_retries + # XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139) + self._foreign_traci_servers: List[TraciConn] = [] # start with the default recovery flags... self._recovery_flags = super().recovery_flags @@ -209,18 +211,25 @@ def _initialize_traci_conn(self, num_retries=5): ) try: - while not self._traci_conn.connected: - try: - self._traci_conn.connect( - timeout=5, - minimum_traci_version=20, - minimum_sumo_version=(1, 10, 0), - debug=self._debug, - ) - except traci.exceptions.FatalTraCIError: - # Could not connect in time just retry connection - pass + self._traci_conn.connect( + timeout=5, + minimum_traci_version=20, + minimum_sumo_version=(1, 10, 0), + debug=self._debug, + ) + + if not self._traci_conn.connected: + # Save the connection to try to avoid closing it for the other client. + self._foreign_traci_servers.append(self._traci_conn) + self._traci_conn = None + raise traci.exceptions.TraCIException( + "TraCI server was likely taken by other client." + ) + except traci.exceptions.FatalTraCIError: + # Could not connect in time just retry connection + current_retries += 1 + continue except traci.exceptions.TraCIException: # SUMO process died... unsure why this is not a fatal traci error current_retries += 1 @@ -230,7 +239,7 @@ def _initialize_traci_conn(self, num_retries=5): continue except KeyboardInterrupt: self._log.debug("Keyboard interrupted TraCI connection.") - self._traci_conn.close_traci_and_pipes() + self._traci_conn.close_traci_and_pipes(wait=False) raise break else: @@ -257,7 +266,7 @@ def _base_sumo_load_params(self): "--net-file=%s" % self._scenario.road_map.source, "--quit-on-end", "--log=%s" % self._log_file, - "--error-log=%s" % self._log_file, + "--error-log=%s.err" % self._log_file, "--no-step-log", "--no-warnings=1", "--seed=%s" % random.randint(0, 2147483648), @@ -293,6 +302,13 @@ def _base_sumo_load_params(self): return load_params + def _restart_sumo(self): + engine_config = config() + traci_retries = self._traci_retries or engine_config( + "sumo", "traci_retries", default=5, cast=int + ) + self._initialize_traci_conn(num_retries=traci_retries) + def setup(self, scenario) -> ProviderState: """Initialize the simulation with a new scenario.""" self._log.debug("Setting up SumoTrafficSim %s", self) @@ -316,24 +332,19 @@ def setup(self, scenario) -> ProviderState: ), "SumoTrafficSimulation requires a SumoRoadNetwork" self._log_file = scenario.unique_sumo_log_file() - if restart_sumo: - try: - engine_config = config() - traci_retries = self._traci_retries or engine_config( - "sumo", "traci_retries", default=5, cast=int - ) - self._initialize_traci_conn(num_retries=traci_retries) - except traci.exceptions.FatalTraCIError: - return ProviderState() - elif self._allow_reload: - assert ( - self._traci_conn is not None - ), "TraCI should be connected at this point." - try: - self._traci_conn.load(self._base_sumo_load_params()) - except traci.exceptions.FatalTraCIError as err: - self._handle_traci_exception(err, actors_relinquishable=False) - return ProviderState() + try: + if restart_sumo: + self._restart_sumo() + elif self._allow_reload: + assert ( + self._traci_conn is not None + ), "TraCI should be connected at this point." + try: + self._traci_conn.load(self._base_sumo_load_params()) + except traci.exceptions.FatalTraCIError: + self._restart_sumo() + except traci.exceptions.FatalTraCIError: + return ProviderState() assert self._traci_conn is not None, "No active TraCI connection" @@ -430,9 +441,19 @@ def teardown(self): self._remove_vehicles() except traci.exceptions.FatalTraCIError: pass - if self._traci_conn is not None: + if not self._allow_reload and self._traci_conn is not None: self._traci_conn.close_traci_and_pipes() + for i, trc in reversed( + [ + (j, trc) + for j, trc in enumerate(self._foreign_traci_servers) + if not trc.viable + ] + ): + self._foreign_traci_servers.pop(i) + trc.close_traci_and_pipes(wait=False) + self._cumulative_sim_seconds = 0 self._non_sumo_vehicle_ids = set() self._sumo_vehicle_ids = set() diff --git a/smarts/core/utils/sumo.py b/smarts/core/utils/sumo.py index 2f162c295a..9c27fe992d 100644 --- a/smarts/core/utils/sumo.py +++ b/smarts/core/utils/sumo.py @@ -98,9 +98,7 @@ def __init__( self._sumo_version: Tuple[int, ...] = tuple() self._host = host self._name = name - # self._log = logging self._log = logging.Logger(self.__class__.__name__) - # self._log.setLevel(logging.ERROR) self._connected = False if sumo_port is None: @@ -124,7 +122,7 @@ def __init__( def __del__(self) -> None: # We should not raise in delete. try: - self.close_traci_and_pipes() + self.close_traci_and_pipes(wait=False) except Exception: pass @@ -171,7 +169,7 @@ def connect( self._sumo_port, err, ) - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise except ConnectionRefusedError: self._log.error( @@ -180,7 +178,7 @@ def connect( self._host, self._sumo_port, ) - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise self._connected = True self._traci_conn = traci_conn @@ -205,7 +203,7 @@ def connect( ) # XXX: the error type is changed to TraCIException to make it consistent with the # process died case of `traci.connect`. Since TraCIException is fatal just in this case... - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise traci.exceptions.TraCIException(err) except OSError as err: self._log.error( @@ -215,7 +213,7 @@ def connect( self._sumo_port, err, ) - self.close_traci_and_pipes(kill=True) + self.close_traci_and_pipes() raise traci.exceptions.TraCIException(err) except ValueError: self.close_traci_and_pipes() @@ -272,11 +270,8 @@ def must_reset(self): """If the version of sumo will have errors if just reloading such that it must be reset.""" return self._sumo_version > (1, 12, 0) - def close_traci_and_pipes(self, wait: Optional[float] = 0, kill: bool = False): + def close_traci_and_pipes(self, wait: bool = True, kill: bool = True): """Safely closes all connections. We should expect this method to always work without throwing""" - assert wait is None or isinstance(wait, (int, float)) - if isinstance(wait, (int, float)): - wait = max(0.0, wait) def __safe_close(conn, **kwargs): try: @@ -295,27 +290,16 @@ def __safe_close(conn, **kwargs): if self._connected: self._log.debug("Closing TraCI connection to %s", self._sumo_port) - __safe_close(self._traci_conn, wait=bool(wait)) + __safe_close(self._traci_conn, wait=wait) if self._sumo_proc: __safe_close(self._sumo_proc.stdin) __safe_close(self._sumo_proc.stdout) __safe_close(self._sumo_proc.stderr) - if wait: - try: - self._sumo_proc.wait(timeout=wait) - except subprocess.TimeoutExpired as err: - kill = True - self._log.error( - "TraCI server process shutdown timed out '%s:%s' [%s]", - self._host, - self._sumo_port, - err, - ) if kill: self._sumo_proc.kill() self._sumo_proc = None - self._log.error( + self._log.info( "Killed TraCI server process '%s:%s", self._host, self._sumo_port ) @@ -342,7 +326,7 @@ def _wrap_traci_method( err, ) # TraCI cannot continue - traci_conn.close_traci_and_pipes(kill=True) + traci_conn.close_traci_and_pipes() raise traci.exceptions.FatalTraCIError("TraCI died.") from err except OSError as err: logging.error( @@ -353,11 +337,11 @@ def _wrap_traci_method( attribute_name, err, ) - traci_conn.close_traci_and_pipes(kill=True) + traci_conn.close_traci_and_pipes() raise OSError("Connection dropped.") from err except traci.exceptions.TraCIException as err: # Case where TraCI/SUMO can theoretically continue raise traci.exceptions.TraCIException("TraCI can continue.") from err except KeyboardInterrupt: - traci_conn.close_traci_and_pipes(kill=True) + traci_conn.close_traci_and_pipes(wait=False) raise