Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load: Add some load metrics of time cost, write point and disk throughput #12735

Merged
merged 14 commits into from
Jun 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(generateResource(writer, progressIndex), true, isGeneratedByPipe);

// Metrics
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
Expand All @@ -431,7 +432,9 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;

import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.RateLimiter;
Expand All @@ -37,6 +38,8 @@ public class LoadTsFileRateLimiter {
private final RateLimiter loadWriteRateLimiter;

public void acquire(long bytes) {
LoadTsFileCostMetricsSet.getInstance().recordDiskIO(bytes);

if (reloadParams()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.metric.load;

import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Rate;
import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;

import java.util.Arrays;

public class LoadTsFileCostMetricsSet implements IMetricSet {

private static final LoadTsFileCostMetricsSet INSTANCE = new LoadTsFileCostMetricsSet();

public static final String ANALYSIS = "analysis";
public static final String FIRST_PHASE = "first_phase";
public static final String SECOND_PHASE = "second_phase";
public static final String LOAD_LOCALLY = "load_locally";

private LoadTsFileCostMetricsSet() {
// empty constructor
}

private Timer analyzerTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;

private Rate diskIORate = DoNothingMetricManager.DO_NOTHING_RATE;

public void recordPhaseTimeCost(String stage, long costTimeInNanos) {
switch (stage) {
case ANALYSIS:
analyzerTimer.updateNanos(costTimeInNanos);
break;
case FIRST_PHASE:
firstPhaseTimer.updateNanos(costTimeInNanos);
break;
case SECOND_PHASE:
secondPhaseTimer.updateNanos(costTimeInNanos);
break;
case LOAD_LOCALLY:
loadLocallyTimer.updateNanos(costTimeInNanos);
break;
default:
throw new UnsupportedOperationException("Unsupported stage: " + stage);
}
}

public void recordDiskIO(long bytes) {
diskIORate.mark(bytes);
}

@Override
public void bindTo(AbstractMetricService metricService) {
analyzerTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), ANALYSIS);
firstPhaseTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
FIRST_PHASE);
secondPhaseTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
SECOND_PHASE);
loadLocallyTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
LOAD_LOCALLY);

diskIORate =
metricService.getOrCreateRate(
Metric.LOAD_DISK_IO.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
"DataNode " + IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
}

@Override
public void unbindFrom(AbstractMetricService metricService) {
Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY)
.forEach(
stage ->
metricService.remove(
MetricType.TIMER,
Metric.LOAD_TIME_COST.toString(),
Tag.NAME.toString(),
stage));

metricService.remove(
MetricType.RATE,
Metric.LOAD_DISK_IO.toString(),
Tag.NAME.toString(),
String.valueOf(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));
}

public static LoadTsFileCostMetricsSet getInstance() {
return LoadTsFileCostMetricsSet.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.db.queryengine.metric;
package org.apache.iotdb.db.queryengine.metric.load;

import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
Expand Down Expand Up @@ -186,6 +187,7 @@
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
import static org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet.ANALYSIS;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
Expand Down Expand Up @@ -2846,6 +2848,7 @@ private InsertBaseStatement removeLogicalView(

@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
long startTime = System.nanoTime();
try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher)) {
return loadTsfileAnalyzer.analyzeFileByFile();
Expand All @@ -2860,6 +2863,9 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage));
return analysis;
} finally {
LoadTsFileCostMetricsSet.getInstance()
.recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
Expand Down Expand Up @@ -108,6 +109,9 @@ public class LoadTsFileScheduler implements IScheduler {

private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET =
LoadTsFileCostMetricsSet.getInstance();

private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() >> 2;
private static final int TRANSMIT_LIMIT =
Expand Down Expand Up @@ -177,16 +181,38 @@ public void start() {
partitionFetcher.queryDataPartition(
slotList,
queryContext.getSession().getUserName()))) { // do not decode, load locally
isLoadSingleTsFileSuccess = loadLocally(node);
final long startTime = System.nanoTime();
try {
isLoadSingleTsFileSuccess = loadLocally(node);
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() - startTime);
}

node.clean();
} else { // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();

boolean isFirstPhaseSuccess = firstPhase(node);
boolean isSecondPhaseSuccess =
secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());
long startTime = System.nanoTime();
final boolean isFirstPhaseSuccess;
try {
isFirstPhaseSuccess = firstPhase(node);
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - startTime);
}

startTime = System.nanoTime();
final boolean isSecondPhaseSuccess;
try {
isSecondPhaseSuccess =
secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.SECOND_PHASE, System.nanoTime() - startTime);
}

node.clean();
if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
Expand Down Expand Up @@ -433,7 +459,9 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
if (!node.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
Expand All @@ -445,7 +473,9 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics;
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
Expand Down Expand Up @@ -93,6 +94,9 @@ public static void bind() {

// bind subscription related metrics
MetricService.getInstance().addMetricSet(SubscriptionMetrics.getInstance());

// bind load related metrics
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
}

private static void initSystemMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ public enum Metric {
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"),
// load related
LOAD_MEM("load_mem");
LOAD_MEM("load_mem"),
LOAD_DISK_IO("load_disk_io"),
LOAD_TIME_COST("load_time_cost"),
LOAD_POINT_COUNT("load_point_count"),
;

final String value;

Expand Down
Loading