Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load: Add some load metrics of time cost, write point and disk throughput #12735

Merged
merged 14 commits into from
Jun 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
Expand Down Expand Up @@ -85,6 +86,9 @@ public class LoadTsFileManager {
new AtomicReference<>(CONFIG.getLoadTsFileDirs());
private static final AtomicReference<FolderManager> FOLDER_MANAGER = new AtomicReference<>();

private static final LoadTsFileCostMetricsSet LOAD_TSFILE_METRICS =
LoadTsFileCostMetricsSet.getInstance();
MiniSho marked this conversation as resolved.
Show resolved Hide resolved

private final Map<String, TsFileWriterManager> uuid2WriterManager = new ConcurrentHashMap<>();

private final Map<String, CleanupTask> uuid2CleanupTask = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -412,6 +416,7 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(generateResource(writer, progressIndex), true, isGeneratedByPipe);

// Metrics
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
Expand All @@ -431,7 +436,9 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_TSFILE.toString());
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.metric;

import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;

import java.util.Arrays;

public class LoadTsFileCostMetricsSet implements IMetricSet {
private static final LoadTsFileCostMetricsSet INSTANCE = new LoadTsFileCostMetricsSet();

public static final String ANALYSIS = "analysis";
public static final String SPLIT = "split";
public static final String WRITE = "write";
MiniSho marked this conversation as resolved.
Show resolved Hide resolved

private LoadTsFileCostMetricsSet() {
// empty constructor
}

private Timer analyzerTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer splitTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer writeTimer = DoNothingMetricManager.DO_NOTHING_TIMER;

public void recordCost(String stage, long costTimeInNanos) {
switch (stage) {
case ANALYSIS:
analyzerTimer.updateNanos(costTimeInNanos);
break;
case SPLIT:
splitTimer.updateNanos(costTimeInNanos);
break;
case WRITE:
writeTimer.updateNanos(costTimeInNanos);
break;
default:
throw new UnsupportedOperationException("Unsupported stage: " + stage);
}
}

@Override
public void bindTo(AbstractMetricService metricService) {
analyzerTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), ANALYSIS);
splitTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), SPLIT);
writeTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), WRITE);
}

@Override
public void unbindFrom(AbstractMetricService metricService) {
Arrays.asList(ANALYSIS, SPLIT, WRITE)
.forEach(
stage ->
metricService.remove(
MetricType.TIMER,
Metric.LOAD_TIME_COST.toString(),
Tag.NAME.toString(),
stage));
}

public static LoadTsFileCostMetricsSet getInstance() {
return LoadTsFileCostMetricsSet.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
Expand Down Expand Up @@ -182,6 +183,7 @@
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_PATTERN;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static org.apache.iotdb.db.queryengine.metric.LoadTsFileCostMetricsSet.ANALYSIS;
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
Expand Down Expand Up @@ -2844,6 +2846,7 @@ private InsertBaseStatement removeLogicalView(

@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
long startTime = System.nanoTime();
try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher)) {
return loadTsfileAnalyzer.analyzeFileByFile();
Expand All @@ -2858,6 +2861,8 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage));
return analysis;
} finally {
LoadTsFileCostMetricsSet.getInstance().recordCost(ANALYSIS, System.nanoTime() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
Expand Down Expand Up @@ -112,6 +113,8 @@ public class LoadTsFileScheduler implements IScheduler {
IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() >> 2;
private static final int TRANSMIT_LIMIT =
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET =
LoadTsFileCostMetricsSet.getInstance();

private static final Set<String> LOADING_FILE_SET = new HashSet<>();

Expand Down Expand Up @@ -230,6 +233,7 @@ public void start() {

private boolean firstPhase(LoadSingleTsFileNode node) {
final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block);
long startTime = System.nanoTime();
try {
new TsFileSplitter(
node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData)
Expand All @@ -253,6 +257,9 @@ private boolean firstPhase(LoadSingleTsFileNode node) {
return false;
} finally {
tsFileDataManager.clear();

LOAD_TSFILE_COST_METRICS_SET.recordCost(
LoadTsFileCostMetricsSet.SPLIT, System.nanoTime() - startTime);
}
return true;
}
Expand Down Expand Up @@ -384,6 +391,7 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
throw new LoadReadOnlyException();
}

long startTime = System.nanoTime();
try {
FragmentInstance instance =
new FragmentInstance(
Expand All @@ -405,6 +413,9 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
e.getFailureStatus().getMessage()));
stateMachine.transitionToFailed(e.getFailureStatus());
return false;
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordCost(
LoadTsFileCostMetricsSet.WRITE, System.nanoTime() - startTime);
}

// add metrics
Expand Down Expand Up @@ -433,7 +444,9 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_TSFILE.toString());
if (!node.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
Expand All @@ -445,7 +458,9 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_TSFILE.toString());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
Expand Down Expand Up @@ -93,6 +94,9 @@ public static void bind() {

// bind subscription related metrics
MetricService.getInstance().addMetricSet(SubscriptionMetrics.getInstance());

// bind load related metrics
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
}

private static void initSystemMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileRateLimiter;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
Expand Down Expand Up @@ -116,6 +117,9 @@ public class StorageEngine implements IService {
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();

private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET =
LoadTsFileCostMetricsSet.getInstance();

/**
* a folder (system/databases/ by default) that persist system info. Each database will have a
* subfolder under the systemDir.
Expand Down Expand Up @@ -871,7 +875,7 @@ public TSStatus executeLoadCommand(
boolean isGeneratedByPipe,
ProgressIndex progressIndex) {
TSStatus status = new TSStatus();

long startTime = System.nanoTime();
try {
switch (loadCommand) {
case EXECUTE:
Expand Down Expand Up @@ -904,6 +908,9 @@ public TSStatus executeLoadCommand(
LOGGER.error("Execute load command {} error.", loadCommand, e);
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordCost(
LoadTsFileCostMetricsSet.WRITE, System.nanoTime() - startTime);
}

return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ public enum Metric {
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"),
// load related
LOAD_MEM("load_mem");
LOAD_TSFILE("load_tsfile"),
LOAD_MEM("load_mem"),
LOAD_TIME_COST("load_time_cost");
MiniSho marked this conversation as resolved.
Show resolved Hide resolved

final String value;

Expand Down
Loading