diff --git a/CHANGELOG.md b/CHANGELOG.md index 933f8aa46..7ee19cf54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Fixed Hive performance regression by streamlining materialization type acquisition ([557](https://github.com/databricks/dbt-databricks/pull/557)) - Fix: Python models authentication could be overridden by a `.netrc` file in the user's home directory ([338](https://github.com/databricks/dbt-databricks/pull/338)) - Fix: MV/ST REST api authentication could be overriden by a `.netrc` file in the user's home directory ([555](https://github.com/databricks/dbt-databricks/pull/555)) +- Updated connection debugging logging and setting connection last used time on session open.([565](https://github.com/databricks/dbt-databricks/pull/565)) ### Under the Hood diff --git a/dbt/adapters/databricks/connections.py b/dbt/adapters/databricks/connections.py index b696fb453..9f7d3e0c9 100644 --- a/dbt/adapters/databricks/connections.py +++ b/dbt/adapters/databricks/connections.py @@ -771,9 +771,10 @@ class DatabricksDBTConnection(Connection): # the next time it is used. language: Optional[str] = None + session_id: Optional[str] = None + def _acquire(self, node: Optional[ResultNode]) -> None: """Indicate that this connection is in use.""" - logger.debug(f"DatabricksDBTConnection._acquire: {self._get_conn_info_str()}") self._log_usage(node) self.acquire_release_count += 1 if self.last_used_time is None: @@ -783,9 +784,10 @@ def _acquire(self, node: Optional[ResultNode]) -> None: else: self.language = None + self._log_info("_acquire") + def _release(self) -> None: """Indicate that this connection is not in use.""" - logger.debug(f"DatabricksDBTConnection._release: {self._get_conn_info_str()}") # Need to check for > 0 because in some situations the dbt code will make an extra # release call on a connection. if self.acquire_release_count > 0: @@ -796,6 +798,8 @@ def _release(self) -> None: if self.acquire_release_count == 0 and self.language != "python": self.last_used_time = time.time() + self._log_info("_release") + def _get_idle_time(self) -> float: return 0 if self.last_used_time is None else time.time() - self.last_used_time @@ -805,11 +809,15 @@ def _idle_too_long(self) -> bool: def _get_conn_info_str(self) -> str: """Generate a string describing this connection.""" return ( - f"name: {self.name}, thread: {self.thread_identifier}, " - f"compute: `{self.compute_name}`, acquire_release_count: {self.acquire_release_count}," - f" idle time: {self._get_idle_time()}s, language: {self.language}" + f"sess: {self.session_id}, name: {self.name}, " + f"idle: {self._get_idle_time()}s, acqrelcnt: {self.acquire_release_count}, " + f"lang: {self.language}, thrd: {self.thread_identifier}, " + f"cmpt: `{self.compute_name}`, lut: {self.last_used_time}" ) + def _log_info(self, caller: Optional[str]) -> None: + logger.debug(f"conn: {id(self)}: {caller} {self._get_conn_info_str()}") + def _log_usage(self, node: Optional[ResultNode]) -> None: if node: if not self.compute_name: @@ -826,8 +834,9 @@ def _log_usage(self, node: Optional[ResultNode]) -> None: logger.debug(f"Thread {self.thread_identifier} using default compute resource.") def _reset_handle(self, open: Callable[[Connection], Connection]) -> None: - logger.debug(f"DatabricksDBTConnection._reset_handle: {self._get_conn_info_str()}") + self._log_info("_reset_handle") self.handle = LazyHandle(open) + self.session_id = None # Reset last_used_time to None because by refreshing this connection becomes associated # with a new session that hasn't been used yet. self.last_used_time = None @@ -1016,10 +1025,11 @@ def _update_compute_connection( # Found a connection and nothing to do, so just return it return conn + orig_conn_name: str = conn.name or "" + if conn.state != ConnectionState.OPEN: conn.handle = LazyHandle(self._open2) if conn.name != new_name: - orig_conn_name: str = conn.name or "" conn.name = new_name fire_event(ConnectionReused(orig_conn_name=orig_conn_name, conn_name=new_name)) @@ -1028,7 +1038,7 @@ def _update_compute_connection( self.clear_thread_connection() self.set_thread_connection(conn) - logger.debug(f"Reusing DatabricksDBTConnection. {conn._get_conn_info_str()}") + conn._log_info(f"reusing connection {orig_conn_name}") return conn @@ -1044,10 +1054,7 @@ def _create_compute_connection( # Create a new connection compute_name = _get_compute_name(node=node) or "" - logger.debug( - f"Creating DatabricksDBTConnection. name: {conn_name}, " - f"thread: {self.get_thread_identifier()}, compute: `{compute_name}`" - ) + conn = DatabricksDBTConnection( type=Identifier(self.TYPE), name=conn_name, @@ -1063,6 +1070,9 @@ def _create_compute_connection( conn.max_idle_time = _get_max_idle_time(node=node, creds=creds) conn.handle = LazyHandle(self._open2) + + conn._log_info("Creating DatabricksDBTConnection") + # Add this connection to the thread/compute connection pool. self._add_compute_connection(conn) # Remove the connection currently in use by this thread from the thread connection pool. @@ -1129,6 +1139,8 @@ def _cleanup_idle_connections(self) -> None: # if different models use different compute resources thread_conns = self._get_compute_connections() for conn in thread_conns.values(): + conn._log_info("idle check connection:") + # Generally speaking we only want to close/refresh the connection if the # acquire_release_count is zero. i.e. the connection is not currently in use. # However python models acquire a connection then run the pyton model, which @@ -1142,15 +1154,18 @@ def _cleanup_idle_connections(self) -> None: if ( conn.acquire_release_count == 0 or conn.language == "python" ) and conn._idle_too_long(): - logger.debug(f"closing idle connection: {conn._get_conn_info_str()}") + conn._log_info("closing idle connection") self.close(conn) conn._reset_handle(self._open2) def get_thread_connection(self) -> Connection: + conn = super().get_thread_connection() + dbr_conn = cast(DatabricksDBTConnection, conn) + dbr_conn._log_info("get_thread_connection:") if USE_LONG_SESSIONS: self._cleanup_idle_connections() - return super().get_thread_connection() + return conn def add_query( self, @@ -1380,8 +1395,10 @@ def _open2(cls, connection: Connection) -> Connection: USE_LONG_SESSIONS ), "This path, '_open2', should only be reachable with USE_LONG_SESSIONS" + databricks_connection = cast(DatabricksDBTConnection, connection) + if connection.state == ConnectionState.OPEN: - logger.debug("Connection is already open, skipping open.") + databricks_connection._log_info("Connection is already open, skipping open.") return connection creds: DatabricksCredentials = connection.credentials @@ -1404,7 +1421,7 @@ def _open2(cls, connection: Connection) -> Connection: # If a model specifies a compute resource the http path # may be different than the http_path property of creds. - http_path = cast(DatabricksDBTConnection, connection).http_path + http_path = databricks_connection.http_path def connect() -> DatabricksSQLConnectionWrapper: try: @@ -1421,6 +1438,12 @@ def connect() -> DatabricksSQLConnectionWrapper: _user_agent_entry=user_agent_entry, **connection_parameters, ) + + if conn: + databricks_connection.session_id = conn.get_session_id_hex() + databricks_connection.last_used_time = time.time() + databricks_connection._log_info("session opened") + return DatabricksSQLConnectionWrapper( conn, is_cluster=creds.cluster_id is not None,