Skip to content

Commit

Permalink
Pipe: Remove and close asynchronous connector manager when all client…
Browse files Browse the repository at this point in the history
…s are useless (#13399) (#13476)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
YC27 and SteveYurongSu authored Sep 11, 2024
1 parent 8662432 commit a798425
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -62,6 +61,11 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager

private final Set<TEndPoint> endPointSet;

private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT =
new ConcurrentHashMap<>();
private final String receiverAttributes;

// receiverAttributes -> IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
private static final Map<String, IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new ConcurrentHashMap<>();
private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> endPoint2Client;
Expand All @@ -82,18 +86,19 @@ public IoTDBDataNodeAsyncClientManager(

endPointSet = new HashSet<>(endPoints);

final String receiverAttributes =
receiverAttributes =
String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, loadTsFileStrategy);
if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) {
synchronized (IoTDBDataRegionAsyncConnector.class) {
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
receiverAttributes,
new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
}
synchronized (IoTDBDataNodeAsyncClientManager.class) {
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
receiverAttributes,
new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);

RECEIVER_ATTRIBUTES_REF_COUNT.compute(
receiverAttributes, (attributes, refCount) -> refCount == null ? 1 : refCount + 1);
}
endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);

switch (loadBalanceStrategy) {
case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
Expand Down Expand Up @@ -288,6 +293,28 @@ public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint);
}

public void close() {
synchronized (IoTDBDataNodeAsyncClientManager.class) {
RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
receiverAttributes,
(attributes, refCount) -> {
if (refCount <= 1) {
final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes);
if (clientManager != null) {
try {
clientManager.close();
} catch (Exception e) {
LOGGER.warn("Failed to close client manager.", e);
}
}
return null;
}
return refCount - 1;
});
}
}

/////////////////////// Strategies for load balance //////////////////////////

private interface LoadBalancer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,14 @@ public synchronized void close() {
tabletBatchBuilder.close();
}

try {
if (clientManager != null) {
clientManager.close();
}
} catch (final Exception e) {
LOGGER.warn("Failed to close client manager.", e);
}

super.close();
}

Expand Down

0 comments on commit a798425

Please sign in to comment.