Skip to content

Commit

Permalink
Pipe: release resources of pipe event that have been GCed but the ref…
Browse files Browse the repository at this point in the history
…erence count is not zero by PhantomReference (#13360)
  • Loading branch information
VGalaxies authored Oct 13, 2024
1 parent 207f90a commit 29ae643
Show file tree
Hide file tree
Showing 25 changed files with 664 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand All @@ -45,6 +46,9 @@ public class PipeConfigNodeRuntimeAgent implements IService {

private final AtomicBoolean isShutdown = new AtomicBoolean(false);

private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();

@Override
public synchronized void start() {
PipeConfig.getInstance().printAllConfigs();
Expand All @@ -58,6 +62,9 @@ public synchronized void start() {
// Clean receiver file dir
PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();

// Start periodical job executor
pipePeriodicalJobExecutor.start();

isShutdown.set(false);
LOGGER.info("PipeRuntimeConfigNodeAgent started");
}
Expand All @@ -69,6 +76,9 @@ public synchronized void stop() {
}
isShutdown.set(true);

// Stop periodical job executor
pipePeriodicalJobExecutor.stop();

PipeConfigNodeAgent.task().dropAllPipeTasks();

LOGGER.info("PipeRuntimeConfigNodeAgent stopped");
Expand Down Expand Up @@ -143,4 +153,10 @@ private void report(
PipeConfigNodeAgent.task().stopAllPipesWithCriticalException();
}
}

/////////////////////////// Periodical Job Executor ///////////////////////////

public void registerPeriodicalJob(String id, Runnable periodicalJob, long intervalInSeconds) {
pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;

import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
Expand All @@ -42,9 +45,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent
implements ReferenceTrackableEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigRegionSnapshotEvent.class);
private String snapshotPath;
Expand Down Expand Up @@ -259,4 +265,53 @@ public String coreReportMessage() {
+ " - "
+ super.coreReportMessage();
}

/////////////////////////// ReferenceTrackableEvent ///////////////////////////

@Override
protected void trackResource() {
PipeConfigNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder());
}

@Override
public PipeEventResource eventResourceBuilder() {
return new PipeConfigRegionSnapshotEventResource(
this.isReleased,
this.referenceCount,
this.resourceManager,
this.snapshotPath,
this.templateFilePath);
}

private static class PipeConfigRegionSnapshotEventResource extends PipeEventResource {

private final PipeSnapshotResourceManager resourceManager;
private final String snapshotPath;
private final String templateFilePath;

private PipeConfigRegionSnapshotEventResource(
final AtomicBoolean isReleased,
final AtomicInteger referenceCount,
final PipeSnapshotResourceManager resourceManager,
final String snapshotPath,
final String templateFilePath) {
super(isReleased, referenceCount);
this.resourceManager = resourceManager;
this.snapshotPath = snapshotPath;
this.templateFilePath = templateFilePath;
}

@Override
protected void finalizeResource() {
try {
resourceManager.decreaseSnapshotReference(snapshotPath);
if (!templateFilePath.isEmpty()) {
resourceManager.decreaseSnapshotReference(templateFilePath);
}
} catch (final Exception e) {
LOGGER.warn(
String.format("Decrease reference count for snapshot %s error.", snapshotPath), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public void bindTo(final AbstractMetricService metricService) {
PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
PipeConfigNodeResourceMetrics.getInstance().bindTo(metricService);
}

@Override
Expand All @@ -55,5 +56,6 @@ public void unbindFrom(final AbstractMetricService metricService) {
PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeResourceMetrics.getInstance().unbindFrom(metricService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.metric;

import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;

public class PipeConfigNodeResourceMetrics implements IMetricSet {

//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////

@Override
public void bindTo(final AbstractMetricService metricService) {
// phantom reference count
metricService.createAutoGauge(
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(),
MetricLevel.IMPORTANT,
PipeConfigNodeResourceManager.ref(),
PipePhantomReferenceManager::getPhantomReferenceCount);
}

@Override
public void unbindFrom(final AbstractMetricService metricService) {
// phantom reference count
metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
}

//////////////////////////// singleton ////////////////////////////

private static class PipeConfigNodeResourceMetricsHolder {

private static final PipeConfigNodeResourceMetrics INSTANCE =
new PipeConfigNodeResourceMetrics();

private PipeConfigNodeResourceMetricsHolder() {
// empty constructor
}
}

public static PipeConfigNodeResourceMetrics getInstance() {
return PipeConfigNodeResourceMetrics.PipeConfigNodeResourceMetricsHolder.INSTANCE;
}

private PipeConfigNodeResourceMetrics() {
// empty constructor
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
package org.apache.iotdb.confignode.manager.pipe.resource;

import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
import org.apache.iotdb.confignode.manager.pipe.resource.ref.PipeConfigNodePhantomReferenceManager;

public class PipeConfigNodeResourceManager {

private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
private final PipeLogManager pipeLogManager;
private final PipePhantomReferenceManager pipePhantomReferenceManager;

public static PipeSnapshotResourceManager snapshot() {
return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE
Expand All @@ -36,11 +39,16 @@ public static PipeLogManager log() {
return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager;
}

public static PipePhantomReferenceManager ref() {
return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager;
}

///////////////////////////// SINGLETON /////////////////////////////

private PipeConfigNodeResourceManager() {
pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager();
pipeLogManager = new PipeLogManager();
pipePhantomReferenceManager = new PipeConfigNodePhantomReferenceManager();
}

private static class PipeResourceManagerHolder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.resource.ref;

import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;

public class PipeConfigNodePhantomReferenceManager extends PipePhantomReferenceManager {

public PipeConfigNodePhantomReferenceManager() {
super();

PipeConfigNodeAgent.runtime()
.registerPeriodicalJob(
"PipePhantomReferenceManager#gcHook()",
super::gcHook,
PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.event;

import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;

public interface ReferenceTrackableEvent {

PipeEventResource eventResourceBuilder();
}
Loading

0 comments on commit 29ae643

Please sign in to comment.