From a798425d1aa4b3b88a418137641a95dc1a3eace9 Mon Sep 17 00:00:00 2001 From: YC27 <76414902+YC27@users.noreply.github.com> Date: Wed, 11 Sep 2024 12:47:12 +0800 Subject: [PATCH] Pipe: Remove and close asynchronous connector manager when all clients are useless (#13399) (#13476) Co-authored-by: Steve Yurong Su --- .../IoTDBDataNodeAsyncClientManager.java | 49 ++++++++++++++----- .../async/IoTDBDataRegionAsyncConnector.java | 8 +++ 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index bfe5360e7492..391545b74196 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; -import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -62,6 +61,11 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager private final Set endPointSet; + private static final Map RECEIVER_ATTRIBUTES_REF_COUNT = + new ConcurrentHashMap<>(); + private final String receiverAttributes; + + // receiverAttributes -> IClientManager private static final Map> ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new ConcurrentHashMap<>(); private final IClientManager endPoint2Client; @@ -82,18 +86,19 @@ public IoTDBDataNodeAsyncClientManager( endPointSet = new HashSet<>(endPoints); - final String receiverAttributes = + receiverAttributes = String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, loadTsFileStrategy); - if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) { - synchronized (IoTDBDataRegionAsyncConnector.class) { - ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent( - receiverAttributes, - new IClientManager.Factory() - .createClientManager( - new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory())); - } + synchronized (IoTDBDataNodeAsyncClientManager.class) { + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent( + receiverAttributes, + new IClientManager.Factory() + .createClientManager( + new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory())); + endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes); + + RECEIVER_ATTRIBUTES_REF_COUNT.compute( + receiverAttributes, (attributes, refCount) -> refCount == null ? 1 : refCount + 1); } - endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes); switch (loadBalanceStrategy) { case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY: @@ -288,6 +293,28 @@ public void updateLeaderCache(String deviceId, TEndPoint endPoint) { LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint); } + public void close() { + synchronized (IoTDBDataNodeAsyncClientManager.class) { + RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent( + receiverAttributes, + (attributes, refCount) -> { + if (refCount <= 1) { + final IClientManager clientManager = + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes); + if (clientManager != null) { + try { + clientManager.close(); + } catch (Exception e) { + LOGGER.warn("Failed to close client manager.", e); + } + } + return null; + } + return refCount - 1; + }); + } + } + /////////////////////// Strategies for load balance ////////////////////////// private interface LoadBalancer { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 5e13c4e0243b..f68d86dd7e89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -539,6 +539,14 @@ public synchronized void close() { tabletBatchBuilder.close(); } + try { + if (clientManager != null) { + clientManager.close(); + } + } catch (final Exception e) { + LOGGER.warn("Failed to close client manager.", e); + } + super.close(); }