diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java index cc6056522ef8..99dbbf1f56d7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; +import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -45,6 +46,9 @@ public class PipeConfigNodeRuntimeAgent implements IService { private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor = + new PipePeriodicalJobExecutor(); + @Override public synchronized void start() { PipeConfig.getInstance().printAllConfigs(); @@ -58,6 +62,9 @@ public synchronized void start() { // Clean receiver file dir PipeConfigNodeAgent.receiver().cleanPipeReceiverDir(); + // Start periodical job executor + pipePeriodicalJobExecutor.start(); + isShutdown.set(false); LOGGER.info("PipeRuntimeConfigNodeAgent started"); } @@ -69,6 +76,9 @@ public synchronized void stop() { } isShutdown.set(true); + // Stop periodical job executor + pipePeriodicalJobExecutor.stop(); + PipeConfigNodeAgent.task().dropAllPipeTasks(); LOGGER.info("PipeRuntimeConfigNodeAgent stopped"); @@ -143,4 +153,10 @@ private void report( PipeConfigNodeAgent.task().stopAllPipesWithCriticalException(); } } + + /////////////////////////// Periodical Job Executor /////////////////////////// + + public void registerPeriodicalJob(String id, Runnable periodicalJob, long intervalInSeconds) { + pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java index 8705d5779428..59b012f06a03 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java @@ -24,9 +24,12 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; +import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager; import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType; +import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -42,9 +45,12 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent { +public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent + implements ReferenceTrackableEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigRegionSnapshotEvent.class); private String snapshotPath; @@ -259,4 +265,53 @@ public String coreReportMessage() { + " - " + super.coreReportMessage(); } + + /////////////////////////// ReferenceTrackableEvent /////////////////////////// + + @Override + protected void trackResource() { + PipeConfigNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder()); + } + + @Override + public PipeEventResource eventResourceBuilder() { + return new PipeConfigRegionSnapshotEventResource( + this.isReleased, + this.referenceCount, + this.resourceManager, + this.snapshotPath, + this.templateFilePath); + } + + private static class PipeConfigRegionSnapshotEventResource extends PipeEventResource { + + private final PipeSnapshotResourceManager resourceManager; + private final String snapshotPath; + private final String templateFilePath; + + private PipeConfigRegionSnapshotEventResource( + final AtomicBoolean isReleased, + final AtomicInteger referenceCount, + final PipeSnapshotResourceManager resourceManager, + final String snapshotPath, + final String templateFilePath) { + super(isReleased, referenceCount); + this.resourceManager = resourceManager; + this.snapshotPath = snapshotPath; + this.templateFilePath = templateFilePath; + } + + @Override + protected void finalizeResource() { + try { + resourceManager.decreaseSnapshotReference(snapshotPath); + if (!templateFilePath.isEmpty()) { + resourceManager.decreaseSnapshotReference(templateFilePath); + } + } catch (final Exception e) { + LOGGER.warn( + String.format("Decrease reference count for snapshot %s error.", snapshotPath), e); + } + } + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java index a7a41f4f3593..264ed997b84e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java @@ -43,6 +43,7 @@ public void bindTo(final AbstractMetricService metricService) { PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService); PipeTemporaryMetaMetrics.getInstance().bindTo(metricService); PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService); + PipeConfigNodeResourceMetrics.getInstance().bindTo(metricService); } @Override @@ -55,5 +56,6 @@ public void unbindFrom(final AbstractMetricService metricService) { PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService); PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService); PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService); + PipeConfigNodeResourceMetrics.getInstance().unbindFrom(metricService); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java new file mode 100644 index 000000000000..d7b1967fc0ff --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeResourceMetrics.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.metric; + +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +public class PipeConfigNodeResourceMetrics implements IMetricSet { + + //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// + + @Override + public void bindTo(final AbstractMetricService metricService) { + // phantom reference count + metricService.createAutoGauge( + Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(), + MetricLevel.IMPORTANT, + PipeConfigNodeResourceManager.ref(), + PipePhantomReferenceManager::getPhantomReferenceCount); + } + + @Override + public void unbindFrom(final AbstractMetricService metricService) { + // phantom reference count + metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString()); + } + + //////////////////////////// singleton //////////////////////////// + + private static class PipeConfigNodeResourceMetricsHolder { + + private static final PipeConfigNodeResourceMetrics INSTANCE = + new PipeConfigNodeResourceMetrics(); + + private PipeConfigNodeResourceMetricsHolder() { + // empty constructor + } + } + + public static PipeConfigNodeResourceMetrics getInstance() { + return PipeConfigNodeResourceMetrics.PipeConfigNodeResourceMetricsHolder.INSTANCE; + } + + private PipeConfigNodeResourceMetrics() { + // empty constructor + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java index c13548c1d096..0dc88dcd2503 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java @@ -20,12 +20,15 @@ package org.apache.iotdb.confignode.manager.pipe.resource; import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; +import org.apache.iotdb.confignode.manager.pipe.resource.ref.PipeConfigNodePhantomReferenceManager; public class PipeConfigNodeResourceManager { private final PipeSnapshotResourceManager pipeSnapshotResourceManager; private final PipeLogManager pipeLogManager; + private final PipePhantomReferenceManager pipePhantomReferenceManager; public static PipeSnapshotResourceManager snapshot() { return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE @@ -36,11 +39,16 @@ public static PipeLogManager log() { return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager; } + public static PipePhantomReferenceManager ref() { + return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager; + } + ///////////////////////////// SINGLETON ///////////////////////////// private PipeConfigNodeResourceManager() { pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager(); pipeLogManager = new PipeLogManager(); + pipePhantomReferenceManager = new PipeConfigNodePhantomReferenceManager(); } private static class PipeResourceManagerHolder { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java new file mode 100644 index 000000000000..83627c40cb81 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/ref/PipeConfigNodePhantomReferenceManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.resource.ref; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; +import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; + +public class PipeConfigNodePhantomReferenceManager extends PipePhantomReferenceManager { + + public PipeConfigNodePhantomReferenceManager() { + super(); + + PipeConfigNodeAgent.runtime() + .registerPeriodicalJob( + "PipePhantomReferenceManager#gcHook()", + super::gcHook, + PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index cb9484c49e70..cbf2f17510ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; +import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java new file mode 100644 index 000000000000..b3d64d68e9c5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/ReferenceTrackableEvent.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event; + +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; + +public interface ReferenceTrackableEvent { + + PipeEventResource eventResourceBuilder(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java index 39f6aa30403a..fed4c0e9e594 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java @@ -24,6 +24,9 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; +import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; +import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; @@ -40,9 +43,12 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent { +public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent + implements ReferenceTrackableEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeSchemaRegionSnapshotEvent.class); private String mTreeSnapshotPath; private String tagLogSnapshotPath; @@ -239,4 +245,56 @@ public String coreReportMessage() { + " - " + super.coreReportMessage(); } + + /////////////////////////// ReferenceTrackableEvent /////////////////////////// + + @Override + protected void trackResource() { + PipeDataNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder()); + } + + @Override + public PipeEventResource eventResourceBuilder() { + return new PipeSchemaRegionSnapshotEventResource( + this.isReleased, + this.referenceCount, + this.resourceManager, + this.mTreeSnapshotPath, + this.tagLogSnapshotPath); + } + + private static class PipeSchemaRegionSnapshotEventResource extends PipeEventResource { + + private final PipeSnapshotResourceManager resourceManager; + private final String mTreeSnapshotPath; + private final String tagLogSnapshotPath; + + private PipeSchemaRegionSnapshotEventResource( + final AtomicBoolean isReleased, + final AtomicInteger referenceCount, + final PipeSnapshotResourceManager resourceManager, + final String mTreeSnapshotPath, + final String tagLogSnapshotPath) { + super(isReleased, referenceCount); + this.resourceManager = resourceManager; + this.mTreeSnapshotPath = mTreeSnapshotPath; + this.tagLogSnapshotPath = tagLogSnapshotPath; + } + + @Override + protected void finalizeResource() { + try { + resourceManager.decreaseSnapshotReference(mTreeSnapshotPath); + if (!tagLogSnapshotPath.isEmpty()) { + resourceManager.decreaseSnapshotReference(tagLogSnapshotPath); + } + } catch (final Exception e) { + LOGGER.warn( + String.format( + "Decrease reference count for mTree snapshot %s or tLog %s error.", + mTreeSnapshotPath, tagLogSnapshotPath), + e); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 7f88f08cf6ff..bd36eb3e719a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; +import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; @@ -54,11 +56,13 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent - implements TabletInsertionEvent { + implements TabletInsertionEvent, ReferenceTrackableEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class); @@ -453,4 +457,43 @@ public String coreReportMessage() { + " - " + super.coreReportMessage(); } + + /////////////////////////// ReferenceTrackableEvent /////////////////////////// + + @Override + protected void trackResource() { + PipeDataNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder()); + } + + @Override + public PipeEventResource eventResourceBuilder() { + return new PipeInsertNodeTabletInsertionEventResource( + this.isReleased, this.referenceCount, this.walEntryHandler); + } + + private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource { + + private final WALEntryHandler walEntryHandler; + + private PipeInsertNodeTabletInsertionEventResource( + final AtomicBoolean isReleased, + final AtomicInteger referenceCount, + final WALEntryHandler walEntryHandler) { + super(isReleased, referenceCount); + this.walEntryHandler = walEntryHandler; + } + + @Override + protected void finalizeResource() { + try { + PipeDataNodeResourceManager.wal().unpin(walEntryHandler); + // no need to release the containers' memory because it has already been GCed + } catch (final Exception e) { + LOGGER.warn( + String.format( + "Decrease reference count for memTable %d error.", walEntryHandler.getMemTableId()), + e); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index c4b51db219a6..98e50e5104d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -26,7 +26,9 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; @@ -41,10 +43,12 @@ import org.apache.tsfile.write.record.Tablet; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; public class PipeRawTabletInsertionEvent extends PipeInsertionEvent - implements TabletInsertionEvent { + implements TabletInsertionEvent, ReferenceTrackableEvent { private Tablet tablet; private String deviceId; // Only used when the tablet is released. @@ -365,4 +369,35 @@ public String coreReportMessage() { + " - " + super.coreReportMessage(); } + + /////////////////////////// ReferenceTrackableEvent /////////////////////////// + + @Override + protected void trackResource() { + PipeDataNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder()); + } + + @Override + public PipeEventResource eventResourceBuilder() { + return new PipeRawTabletInsertionEventResource( + this.isReleased, this.referenceCount, this.allocatedMemoryBlock); + } + + private static class PipeRawTabletInsertionEventResource extends PipeEventResource { + + private final PipeTabletMemoryBlock allocatedMemoryBlock; + + private PipeRawTabletInsertionEventResource( + final AtomicBoolean isReleased, + final AtomicInteger referenceCount, + final PipeTabletMemoryBlock allocatedMemoryBlock) { + super(isReleased, referenceCount); + this.allocatedMemoryBlock = allocatedMemoryBlock; + } + + @Override + protected void finalizeResource() { + allocatedMemoryBlock.close(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index d71c6f9d0298..09aa2f9854cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; +import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; @@ -51,11 +53,13 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; -public class PipeTsFileInsertionEvent extends PipeInsertionEvent implements TsFileInsertionEvent { +public class PipeTsFileInsertionEvent extends PipeInsertionEvent + implements TsFileInsertionEvent, ReferenceTrackableEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class); @@ -558,4 +562,49 @@ public String coreReportMessage() { + " - " + super.coreReportMessage(); } + + /////////////////////////// ReferenceTrackableEvent /////////////////////////// + + @Override + public void trackResource() { + PipeDataNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder()); + } + + @Override + public PipeEventResource eventResourceBuilder() { + return new PipeTsFileInsertionEventResource( + this.isReleased, this.referenceCount, this.tsFile, this.isWithMod, this.modFile); + } + + private static class PipeTsFileInsertionEventResource extends PipeEventResource { + + private final File tsFile; + private final boolean isWithMod; + private final File modFile; + + private PipeTsFileInsertionEventResource( + final AtomicBoolean isReleased, + final AtomicInteger referenceCount, + final File tsFile, + final boolean isWithMod, + final File modFile) { + super(isReleased, referenceCount); + this.tsFile = tsFile; + this.isWithMod = isWithMod; + this.modFile = modFile; + } + + @Override + protected void finalizeResource() { + try { + PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile); + if (isWithMod) { + PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile); + } + } catch (final Exception e) { + LOGGER.warn( + String.format("Decrease reference count for TsFile %s error.", tsFile.getPath()), e); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java index 04fb647a8259..854ff9ffb106 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.metric; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -66,6 +67,12 @@ public void bindTo(final AbstractMetricService metricService) { MetricLevel.IMPORTANT, PipeDataNodeResourceManager.tsfile(), PipeTsFileResourceManager::getLinkedTsfileCount); + // phantom reference count + metricService.createAutoGauge( + Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(), + MetricLevel.IMPORTANT, + PipeDataNodeResourceManager.ref(), + PipePhantomReferenceManager::getPhantomReferenceCount); } @Override @@ -78,6 +85,8 @@ public void unbindFrom(final AbstractMetricService metricService) { // resource reference count metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PINNED_MEMTABLE_COUNT.toString()); metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_LINKED_TSFILE_COUNT.toString()); + // phantom reference count + metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString()); } //////////////////////////// singleton //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java index 74afd58681f5..573106e45c2e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java @@ -21,8 +21,10 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; +import org.apache.iotdb.db.pipe.resource.ref.PipeDataNodePhantomReferenceManager; import org.apache.iotdb.db.pipe.resource.snapshot.PipeDataNodeSnapshotResourceManager; import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager; @@ -38,6 +40,7 @@ public class PipeDataNodeResourceManager { private final PipeSnapshotResourceManager pipeSnapshotResourceManager; private final PipeMemoryManager pipeMemoryManager; private final PipeLogManager pipeLogManager; + private final PipePhantomReferenceManager pipePhantomReferenceManager; public static PipeTsFileResourceManager tsfile() { return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager; @@ -69,6 +72,10 @@ public static PipeLogManager log() { return PipeResourceManagerHolder.INSTANCE.pipeLogManager; } + public static PipePhantomReferenceManager ref() { + return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager; + } + ///////////////////////////// SINGLETON ///////////////////////////// private PipeDataNodeResourceManager() { @@ -77,6 +84,7 @@ private PipeDataNodeResourceManager() { pipeSnapshotResourceManager = new PipeDataNodeSnapshotResourceManager(); pipeMemoryManager = new PipeMemoryManager(); pipeLogManager = new PipeLogManager(); + pipePhantomReferenceManager = new PipeDataNodePhantomReferenceManager(); } private static class PipeResourceManagerHolder { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java new file mode 100644 index 000000000000..fa9f87a1f19f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/ref/PipeDataNodePhantomReferenceManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.ref; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; + +public class PipeDataNodePhantomReferenceManager extends PipePhantomReferenceManager { + + public PipeDataNodePhantomReferenceManager() { + super(); + + PipeDataNodeAgent.runtime() + .registerPeriodicalJob( + "PipePhantomReferenceManager#gcHook()", + super::gcHook, + PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index e73679561267..02ba8552ee21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -20,10 +20,10 @@ package org.apache.iotdb.db.pipe.resource.tsfile; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; -import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java index 6e62a819c696..49d90f255684 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.subscription.event; -import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; @@ -97,7 +97,7 @@ private SubscriptionEventBinaryCache() { final long maxMemorySizeInBytes = (long) (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() - * PipeConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage()); + * SubscriptionConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage()); // properties required by pipe memory control framework final PipeMemoryBlock allocatedMemoryBlock = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 46ad1c14bbf9..4d29f22482a7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -274,6 +274,9 @@ public class CommonConfig { private float subscriptionCacheMemoryUsagePercentage = 0.2F; + private boolean pipeEventReferenceTrackingEnabled = false; // TODO: enable later + private long pipeEventReferenceEliminateIntervalSeconds = 10; + private int subscriptionSubtaskExecutorMaxThreadNum = Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); private int subscriptionPrefetchTabletBatchMaxDelayInMs = 1000; // 1s @@ -1182,6 +1185,23 @@ public void setTwoStageAggregateSenderEndPointsCacheInMs( this.twoStageAggregateSenderEndPointsCacheInMs = twoStageAggregateSenderEndPointsCacheInMs; } + public boolean getPipeEventReferenceTrackingEnabled() { + return pipeEventReferenceTrackingEnabled; + } + + public void setPipeEventReferenceTrackingEnabled(boolean pipeEventReferenceTrackingEnabled) { + this.pipeEventReferenceTrackingEnabled = pipeEventReferenceTrackingEnabled; + } + + public long getPipeEventReferenceEliminateIntervalSeconds() { + return pipeEventReferenceEliminateIntervalSeconds; + } + + public void setPipeEventReferenceEliminateIntervalSeconds( + long pipeEventReferenceEliminateIntervalSeconds) { + this.pipeEventReferenceEliminateIntervalSeconds = pipeEventReferenceEliminateIntervalSeconds; + } + public float getSubscriptionCacheMemoryUsagePercentage() { return subscriptionCacheMemoryUsagePercentage; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 48f7aad2391a..cccee5e7a23c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -604,6 +604,17 @@ private void loadPipeProps(Properties properties) { properties.getProperty( "two_stage_aggregate_sender_end_points_cache_in_ms", String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs())))); + + config.setPipeEventReferenceTrackingEnabled( + Boolean.parseBoolean( + properties.getProperty( + "pipe_event_reference_tracking_enabled", + String.valueOf(config.getPipeEventReferenceTrackingEnabled())))); + config.setPipeEventReferenceEliminateIntervalSeconds( + Long.parseLong( + properties.getProperty( + "pipe_event_reference_eliminate_interval_seconds", + String.valueOf(config.getPipeEventReferenceEliminateIntervalSeconds())))); } private void loadSubscriptionProps(Properties properties) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java similarity index 94% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java index fb3c87d7ef3e..ca972f1cdd01 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent.runtime; +package org.apache.iotdb.commons.pipe.agent.runtime; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.db.service.DataNode; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; @@ -38,8 +37,8 @@ import java.util.concurrent.TimeUnit; /** - * Single thread to execute pipe periodical jobs on {@link DataNode}. This is for limiting the - * thread num on the {@link DataNode} instance. + * Single thread to execute pipe periodical jobs on DataNode or ConfigNode. This is for limiting the + * thread num on the DataNode or ConfigNode instance. */ public class PipePeriodicalJobExecutor { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 957065286cca..5f1571d7ebfa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -308,10 +308,14 @@ public long getTwoStageAggregateSenderEndPointsCacheInMs() { return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs(); } - /////////////////////////////// Subscription /////////////////////////////// + /////////////////////////////// Ref /////////////////////////////// - public float getSubscriptionCacheMemoryUsagePercentage() { - return COMMON_CONFIG.getSubscriptionCacheMemoryUsagePercentage(); + public boolean getPipeEventReferenceTrackingEnabled() { + return COMMON_CONFIG.getPipeEventReferenceTrackingEnabled(); + } + + public long getPipeEventReferenceEliminateIntervalSeconds() { + return COMMON_CONFIG.getPipeEventReferenceEliminateIntervalSeconds(); } /////////////////////////////// Utils /////////////////////////////// @@ -450,8 +454,10 @@ public void printAllConfigs() { "TwoStageAggregateSenderEndPointsCacheInMs: {}", getTwoStageAggregateSenderEndPointsCacheInMs()); + LOGGER.info("PipeEventReferenceTrackingEnabled: {}", getPipeEventReferenceTrackingEnabled()); LOGGER.info( - "SubscriptionCacheMemoryUsagePercentage: {}", getSubscriptionCacheMemoryUsagePercentage()); + "PipeEventReferenceEliminateIntervalSeconds: {}", + getPipeEventReferenceEliminateIntervalSeconds()); } /////////////////////////////// Singleton /////////////////////////////// diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index ee70ff5286c5..2fdef0730a81 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.pipe.api.event.Event; @@ -110,6 +111,10 @@ protected EnrichedEvent( }); } + protected void trackResource() { + // do nothing by default + } + /** * Increase the {@link EnrichedEvent#referenceCount} of this event. When the {@link * EnrichedEvent#referenceCount} is positive, the data in the resource of this {@link @@ -137,7 +142,10 @@ public synchronized boolean increaseReferenceCount(final String holderMessage) { } if (isSuccessful) { - referenceCount.incrementAndGet(); + if (referenceCount.incrementAndGet() == 1 + && PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) { + trackResource(); + } } else { LOGGER.warn( "increase reference count failed, EnrichedEvent: {}, stack trace: {}", diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java new file mode 100644 index 000000000000..7422221c3d3d --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.resource.ref; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class PipePhantomReferenceManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipePhantomReferenceManager.class); + + private static final Set PIPE_EVENT_PHANTOM_REFERENCES = + ConcurrentHashMap.newKeySet(); + + private static final ReferenceQueue REFERENCE_QUEUE = new ReferenceQueue<>(); + + public PipePhantomReferenceManager() { + // Do nothing now. + } + + public int getPhantomReferenceCount() { + return PIPE_EVENT_PHANTOM_REFERENCES.size(); + } + + protected void gcHook() { + if (!PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) { + return; + } + + Reference reference; + try { + while ((reference = REFERENCE_QUEUE.remove(500)) != null) { + finalizeResource((PipeEventPhantomReference) reference); + } + } catch (final InterruptedException e) { + // Finalize remaining references. + while ((reference = REFERENCE_QUEUE.poll()) != null) { + finalizeResource((PipeEventPhantomReference) reference); + } + } catch (final Exception e) { + // Nowhere to really log this. + } + } + + private void finalizeResource(final PipeEventPhantomReference reference) { + try { + reference.finalizeResources(); + reference.clear(); + } finally { + PIPE_EVENT_PHANTOM_REFERENCES.remove(reference); + } + } + + private static class PipeEventPhantomReference extends PhantomReference { + + private final String holderMessage; + private PipeEventResource resource; + + private PipeEventPhantomReference( + final EnrichedEvent event, + final PipeEventResource resource, + final ReferenceQueue queue) { + super(event, queue); + this.holderMessage = event.getClass().getSimpleName(); + this.resource = resource; + } + + private void finalizeResources() { + if (this.resource != null) { + try { + this.resource.clearReferenceCount(holderMessage); + } finally { + this.resource = null; + } + } + } + } + + ///////////////////// APIs provided for EnrichedEvent ///////////////////// + + public void trackPipeEventResource(final EnrichedEvent event, final PipeEventResource resource) { + final PipeEventPhantomReference reference = + new PipeEventPhantomReference(event, resource, REFERENCE_QUEUE); + PIPE_EVENT_PHANTOM_REFERENCES.add(reference); + } + + public abstract static class PipeEventResource { + + private final AtomicBoolean isReleased; + private final AtomicInteger referenceCount; + + protected PipeEventResource( + final AtomicBoolean isReleased, final AtomicInteger referenceCount) { + this.isReleased = isReleased; + this.referenceCount = referenceCount; + } + + private void clearReferenceCount(final String holderMessage) { + if (isReleased.get()) { + return; + } + + if (referenceCount.get() >= 1) { + LOGGER.error("PIPE EVENT RESOURCE LEAK DETECTED: {}", holderMessage); + finalizeResource(); + } + + referenceCount.set(0); + isReleased.set(true); + } + + protected abstract void finalizeResource(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 7c2f4ac10168..764b7bcaf2f2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -152,6 +152,7 @@ public enum Metric { PIPE_MEM("pipe_mem"), PIPE_PINNED_MEMTABLE_COUNT("pipe_pinned_memtable_count"), PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"), + PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"), PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"), PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"), PIPE_PROCEDURE("pipe_procedure"), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java index ccbd838738e0..8a894bc5df5c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java @@ -31,6 +31,10 @@ public class SubscriptionConfig { /////////////////////////////// Subtask Executor /////////////////////////////// + public float getSubscriptionCacheMemoryUsagePercentage() { + return COMMON_CONFIG.getSubscriptionCacheMemoryUsagePercentage(); + } + public int getSubscriptionSubtaskExecutorMaxThreadNum() { return COMMON_CONFIG.getSubscriptionSubtaskExecutorMaxThreadNum(); } @@ -84,6 +88,8 @@ public long getSubscriptionTsFileDeduplicationWindowSeconds() { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConfig.class); public void printAllConfigs() { + LOGGER.info( + "SubscriptionCacheMemoryUsagePercentage: {}", getSubscriptionCacheMemoryUsagePercentage()); LOGGER.info( "SubscriptionSubtaskExecutorMaxThreadNum: {}", getSubscriptionSubtaskExecutorMaxThreadNum());