Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: release resources of pipe event that have been GCed but the reference count is not zero by PhantomReference #13360

Merged
merged 31 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
88a6a60
basic setup for tsfile event
VGalaxies Aug 31, 2024
a365043
minor improve
VGalaxies Sep 2, 2024
184039c
Merge branch 'improve-pipe-ref' into PipePhantomReference
VGalaxies Sep 2, 2024
e96a94b
refactor
VGalaxies Sep 3, 2024
b384570
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 3, 2024
5a796c0
refact
VGalaxies Sep 5, 2024
4db5253
intro config
VGalaxies Sep 5, 2024
f49007a
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 5, 2024
e679523
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 6, 2024
9189fff
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 7, 2024
5ed4eb1
refactor
VGalaxies Sep 9, 2024
aea9d16
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 13, 2024
9c09937
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 18, 2024
ddfa3ae
metrics integration
VGalaxies Sep 19, 2024
e0cae62
gc hook interval config
VGalaxies Sep 19, 2024
f7a4c09
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 20, 2024
dff6750
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 20, 2024
fede332
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 22, 2024
dfe4383
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 23, 2024
252e60a
fixup
VGalaxies Sep 23, 2024
8bd3ace
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 23, 2024
8423c68
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 24, 2024
8f495c0
Merge branch 'master' into PipePhantomReference
VGalaxies Sep 24, 2024
da7c416
Merge branch 'master' into PipePhantomReference
VGalaxies Oct 10, 2024
a1f8712
Merge branch 'master' into PipePhantomReference
VGalaxies Oct 10, 2024
add7302
rm SubscriptionPerfTest
VGalaxies Oct 10, 2024
a00f1db
Merge branch 'master' into PipePhantomReference
VGalaxies Oct 11, 2024
f5c5a37
Merge branch 'master' into PipePhantomReference
VGalaxies Oct 12, 2024
ca8ed2b
fix code review
VGalaxies Oct 12, 2024
7d9166f
Merge branch 'master' into PipePhantomReference
VGalaxies Oct 12, 2024
92263c9
disable by default
VGalaxies Oct 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand All @@ -45,6 +46,9 @@ public class PipeConfigNodeRuntimeAgent implements IService {

private final AtomicBoolean isShutdown = new AtomicBoolean(false);

private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();

@Override
public synchronized void start() {
PipeConfig.getInstance().printAllConfigs();
Expand All @@ -58,6 +62,9 @@ public synchronized void start() {
// Clean receiver file dir
PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();

// Start periodical job executor
pipePeriodicalJobExecutor.start();

isShutdown.set(false);
LOGGER.info("PipeRuntimeConfigNodeAgent started");
}
Expand All @@ -69,6 +76,9 @@ public synchronized void stop() {
}
isShutdown.set(true);

// Stop periodical job executor
pipePeriodicalJobExecutor.stop();

PipeConfigNodeAgent.task().dropAllPipeTasks();

LOGGER.info("PipeRuntimeConfigNodeAgent stopped");
Expand Down Expand Up @@ -143,4 +153,10 @@ private void report(
PipeConfigNodeAgent.task().stopAllPipesWithCriticalException();
}
}

/////////////////////////// Periodical Job Executor ///////////////////////////

public void registerPeriodicalJob(String id, Runnable periodicalJob, long intervalInSeconds) {
pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;

import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
Expand All @@ -42,9 +45,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent
implements ReferenceTrackableEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigRegionSnapshotEvent.class);
private String snapshotPath;
Expand Down Expand Up @@ -259,4 +265,53 @@ public String coreReportMessage() {
+ " - "
+ super.coreReportMessage();
}

/////////////////////////// ReferenceTrackableEvent ///////////////////////////

@Override
protected void trackResource() {
PipeConfigNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder());
}

@Override
public PipeEventResource eventResourceBuilder() {
return new PipeConfigRegionSnapshotEventResource(
this.isReleased,
this.referenceCount,
this.resourceManager,
this.snapshotPath,
this.templateFilePath);
}

private static class PipeConfigRegionSnapshotEventResource extends PipeEventResource {

private final PipeSnapshotResourceManager resourceManager;
private final String snapshotPath;
private final String templateFilePath;

private PipeConfigRegionSnapshotEventResource(
final AtomicBoolean isReleased,
final AtomicInteger referenceCount,
final PipeSnapshotResourceManager resourceManager,
final String snapshotPath,
final String templateFilePath) {
super(isReleased, referenceCount);
this.resourceManager = resourceManager;
this.snapshotPath = snapshotPath;
this.templateFilePath = templateFilePath;
}

@Override
protected void finalizeResource() {
try {
resourceManager.decreaseSnapshotReference(snapshotPath);
if (!templateFilePath.isEmpty()) {
resourceManager.decreaseSnapshotReference(templateFilePath);
}
} catch (final Exception e) {
LOGGER.warn(
String.format("Decrease reference count for snapshot %s error.", snapshotPath), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public void bindTo(final AbstractMetricService metricService) {
PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
PipeConfigNodeResourceMetrics.getInstance().bindTo(metricService);
}

@Override
Expand All @@ -55,5 +56,6 @@ public void unbindFrom(final AbstractMetricService metricService) {
PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeResourceMetrics.getInstance().unbindFrom(metricService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.metric;

import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;

public class PipeConfigNodeResourceMetrics implements IMetricSet {

//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////

@Override
public void bindTo(final AbstractMetricService metricService) {
// phantom reference count
metricService.createAutoGauge(
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString(),
MetricLevel.IMPORTANT,
PipeConfigNodeResourceManager.ref(),
PipePhantomReferenceManager::getPhantomReferenceCount);
}

@Override
public void unbindFrom(final AbstractMetricService metricService) {
// phantom reference count
metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
}

//////////////////////////// singleton ////////////////////////////

private static class PipeConfigNodeResourceMetricsHolder {

private static final PipeConfigNodeResourceMetrics INSTANCE =
new PipeConfigNodeResourceMetrics();

private PipeConfigNodeResourceMetricsHolder() {
// empty constructor
}
}

public static PipeConfigNodeResourceMetrics getInstance() {
return PipeConfigNodeResourceMetrics.PipeConfigNodeResourceMetricsHolder.INSTANCE;
}

private PipeConfigNodeResourceMetrics() {
// empty constructor
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
package org.apache.iotdb.confignode.manager.pipe.resource;

import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager;
import org.apache.iotdb.confignode.manager.pipe.resource.ref.PipeConfigNodePhantomReferenceManager;

public class PipeConfigNodeResourceManager {

private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
private final PipeLogManager pipeLogManager;
private final PipePhantomReferenceManager pipePhantomReferenceManager;

public static PipeSnapshotResourceManager snapshot() {
return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE
Expand All @@ -36,11 +39,16 @@ public static PipeLogManager log() {
return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager;
}

public static PipePhantomReferenceManager ref() {
return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager;
}

///////////////////////////// SINGLETON /////////////////////////////

private PipeConfigNodeResourceManager() {
pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager();
pipeLogManager = new PipeLogManager();
pipePhantomReferenceManager = new PipeConfigNodePhantomReferenceManager();
}

private static class PipeResourceManagerHolder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.resource.ref;

import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;

public class PipeConfigNodePhantomReferenceManager extends PipePhantomReferenceManager {

public PipeConfigNodePhantomReferenceManager() {
super();

PipeConfigNodeAgent.runtime()
.registerPeriodicalJob(
"PipePhantomReferenceManager#gcHook()",
super::gcHook,
PipeConfig.getInstance().getPipeEventReferenceEliminateIntervalSeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.event;

import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;

public interface ReferenceTrackableEvent {

PipeEventResource eventResourceBuilder();
}
Loading
Loading