Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt partition computation for IDeviceID #12700

Merged
merged 10 commits into from
Jun 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -61,6 +63,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -345,12 +348,12 @@ public void testGetSlots() throws Exception {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";

final String d00 = sg0 + ".d0.s";
final String d01 = sg0 + ".d1.s";
final String d10 = sg1 + ".d0.s";
final String d11 = sg1 + ".d1.s";
final IDeviceID d00 = Factory.DEFAULT_FACTORY.create(sg0 + ".d0.s");
final IDeviceID d01 = Factory.DEFAULT_FACTORY.create(sg0 + ".d1.s");
final IDeviceID d10 = Factory.DEFAULT_FACTORY.create(sg1 + ".d0.s");
final IDeviceID d11 = Factory.DEFAULT_FACTORY.create(sg1 + ".d1.s");

String[] devices = new String[] {d00, d01, d10, d11};
IDeviceID[] devices = new IDeviceID[] {d00, d01, d10, d11};

try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
Expand Down Expand Up @@ -430,8 +433,10 @@ public void testGetSlots() throws Exception {
// Get RegionId with device

TGetRegionIdReq deviceReq = new TGetRegionIdReq(TConsensusGroupType.DataRegion);
for (String device : devices) {
deviceReq.setDevice(device);
for (IDeviceID device : devices) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
device.serialize(baos);
deviceReq.setDevice(baos.toByteArray());
TGetRegionIdResp resp = client.getRegionId(deviceReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -691,7 +692,7 @@ private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, Partia
return new ArrayList<>();
}
return Collections.singletonList(
getPartitionManager().getSeriesPartitionSlot(innerPath.getIDeviceID().toString()));
getPartitionManager().getSeriesPartitionSlot(innerPath.getIDeviceID()));
}

@Override
Expand Down Expand Up @@ -760,17 +761,17 @@ public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patt
return resp.setStatus(status);
}

List<String> devicePaths = patternTree.getAllDevicePatterns();
List<IDeviceID> devicePaths = patternTree.getAllDevicePatterns();
List<String> databases = getClusterSchemaManager().getDatabaseNames();

// Build GetOrCreateSchemaPartitionPlan
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
for (String devicePath : devicePaths) {
for (IDeviceID deviceID : devicePaths) {
for (String database : databases) {
if (PathUtils.isStartWith(devicePath, database)) {
if (PathUtils.isStartWith(deviceID, database)) {
partitionSlotsMap
.computeIfAbsent(database, key -> new ArrayList<>())
.add(getPartitionManager().getSeriesPartitionSlot(devicePath));
.add(getPartitionManager().getSeriesPartitionSlot(deviceID));
break;
}
}
Expand All @@ -790,11 +791,11 @@ public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patt
}

private void printNewCreatedSchemaPartition(
List<String> devicePaths, TSchemaPartitionTableResp resp) {
List<IDeviceID> deviceIDS, TSchemaPartitionTableResp resp) {
final String lineSeparator = System.lineSeparator();
StringBuilder devicePathString = new StringBuilder("{");
for (String devicePath : devicePaths) {
devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
for (IDeviceID deviceID : deviceIDS) {
devicePathString.append(lineSeparator).append("\t").append(deviceID).append(",");
}
devicePathString.append(lineSeparator).append("}");

Expand Down Expand Up @@ -865,8 +866,8 @@ private void printNodePathsPartition(
final String lineSeparator = System.lineSeparator();

StringBuilder devicePathString = new StringBuilder("{");
for (String devicePath : scope.getAllDevicePatterns()) {
devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
for (IDeviceID deviceID : scope.getAllDevicePatterns()) {
devicePathString.append(lineSeparator).append("\t").append(deviceID).append(",");
}
devicePathString.append(lineSeparator).append("}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Deserializer;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1004,11 +1007,11 @@ public boolean isDatabasePreDeleted(String database) {
/**
* Get TSeriesPartitionSlot.
*
* @param devicePath Full path ending with device name
* @param deviceID
* @return SeriesPartitionSlot
*/
public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) {
return executor.getSeriesPartitionSlot(devicePath);
public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) {
return executor.getSeriesPartitionSlot(deviceID);
}

public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) {
Expand Down Expand Up @@ -1083,8 +1086,10 @@ public GetRegionIdResp getRegionId(TGetRegionIdReq req) {
if (req.isSetDatabase()) {
plan.setDatabase(req.getDatabase());
} else {
plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice()));
plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice()));
IDeviceID deviceID =
Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(ByteBuffer.wrap(req.getDevice()));
plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(deviceID));
plan.setSeriesSlotId(executor.getSeriesPartitionSlot(deviceID));
}
if (Objects.equals(plan.getDatabase(), "")) {
// Return empty result if Database not specified
Expand Down Expand Up @@ -1118,8 +1123,10 @@ public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
if (req.isSetDatabase()) {
plan.setDatabase(req.getDatabase());
} else if (req.isSetDevice()) {
plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice()));
plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice()));
IDeviceID deviceID =
Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(ByteBuffer.wrap(req.getDevice()));
plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(deviceID));
plan.setSeriesSlotId(executor.getSeriesPartitionSlot(deviceID));
if (Objects.equals(plan.getDatabase(), "")) {
// Return empty result if Database not specified
return new GetTimeSlotListResp(RpcUtils.SUCCESS_STATUS, new ArrayList<>());
Expand All @@ -1145,8 +1152,10 @@ public CountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) {
if (req.isSetDatabase()) {
plan.setDatabase(req.getDatabase());
} else if (req.isSetDevice()) {
plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice()));
plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice()));
IDeviceID deviceID =
Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(ByteBuffer.wrap(req.getDevice()));
plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(deviceID));
plan.setSeriesSlotId(executor.getSeriesPartitionSlot(deviceID));
if (Objects.equals(plan.getDatabase(), "")) {
// Return empty result if Database not specified
return new CountTimeSlotListResp(RpcUtils.SUCCESS_STATUS, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -610,10 +611,10 @@ public TDatabaseSchema getDatabaseSchemaByName(String database)
*
* @return The DatabaseName of the specified Device. Empty String if not exists.
*/
public String getDatabaseNameByDevice(String devicePath) {
public String getDatabaseNameByDevice(IDeviceID deviceID) {
List<String> databases = getDatabaseNames();
for (String database : databases) {
if (PathUtils.isStartWith(devicePath, database)) {
if (PathUtils.isStartWith(deviceID, database)) {
return database;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -41,9 +44,9 @@ public class DeviceGroupHashExecutorManualTest {
private static final String chars =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-";

private List<String> genBatchDevices() {
private List<IDeviceID> genBatchDevices() {
Random random = new Random();
List<String> devices = new ArrayList<>();
List<IDeviceID> devices = new ArrayList<>();
int fatherLength = random.nextInt(10) + 10;
int deviceLength = random.nextInt(5) + 5;

Expand All @@ -56,7 +59,7 @@ private List<String> genBatchDevices() {
for (int k = 0; k < deviceLength; k++) {
curDevice.append(chars.charAt(random.nextInt(chars.length())));
}
devices.add(curDevice.toString());
devices.add(Factory.DEFAULT_FACTORY.create(curDevice.toString()));
}
return devices;
}
Expand All @@ -68,9 +71,9 @@ public void GeneralIndexTest() throws IOException {

long totalTime = 0;
for (int i = 0; i < batchCount; i++) {
List<String> devices = genBatchDevices();
List<IDeviceID> devices = genBatchDevices();
totalTime -= System.currentTimeMillis();
for (String device : devices) {
for (IDeviceID device : devices) {
bucket[manager.getSeriesPartitionSlot(device).getSlotId()] += 1;
}
totalTime += System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.block.TsBlock;
Expand Down Expand Up @@ -673,7 +674,7 @@ private TSExecuteStatementResp executeAggregationQueryInternal(

private List<TsBlock> executeGroupByQueryInternal(
SessionInfo sessionInfo,
String device,
IDeviceID deviceID,
String measurement,
TSDataType dataType,
boolean isAligned,
Expand Down Expand Up @@ -736,7 +737,7 @@ private List<TsBlock> executeGroupByQueryInternal(
if (isAligned) {
path =
new AlignedFullPath(
Factory.DEFAULT_FACTORY.create(device),
deviceID,
Collections.singletonList(measurement),
Collections.singletonList(measurementSchema));
operator =
Expand All @@ -754,7 +755,7 @@ private List<TsBlock> executeGroupByQueryInternal(
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
&& !TAggregationType.FIRST_VALUE.equals(aggregationType)));
} else {
path = new NonAlignedFullPath(Factory.DEFAULT_FACTORY.create(device), measurementSchema);
path = new NonAlignedFullPath(deviceID, measurementSchema);
operator =
new SeriesAggregationScanOperator(
planNodeId,
Expand Down Expand Up @@ -827,28 +828,30 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
Throwable t = null;
try {
String db;
String deviceId;
String device;
PartialPath devicePath;

queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);

if (req.isLegalPathNodes()) {
db = req.db;
deviceId = req.deviceId;
devicePath = new PartialPath(deviceId.split("\\."));
device = req.deviceId;
devicePath = new PartialPath(device.split("\\."));
} else {
db = new PartialPath(req.db).getFullPath();
devicePath = new PartialPath(req.deviceId);
deviceId = devicePath.getFullPath();
device = devicePath.getFullPath();
}

IDeviceID deviceID = Factory.DEFAULT_FACTORY.create(device);

DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, true);
new DataPartitionQueryParam(deviceID, Collections.emptyList(), true, true);
DataPartition dataPartition =
partitionFetcher.getDataPartitionWithUnclosedTimeRange(
Collections.singletonMap(db, Collections.singletonList(queryParam)));
List<TRegionReplicaSet> regionReplicaSets =
dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null);
dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceID, null);

// no valid DataRegion
if (regionReplicaSets.isEmpty()
Expand Down Expand Up @@ -1041,7 +1044,7 @@ public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryInt
String[] splits = Strings.split(req.getDevice(), "\\.");
database = String.format("%s.%s", splits[0], splits[1]);
}
String deviceId = req.getDevice();
IDeviceID deviceId = Factory.DEFAULT_FACTORY.create(req.getDevice());
String measurementId = req.getMeasurement();
TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
Expand Down Expand Up @@ -533,27 +534,27 @@ public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOEx
}

/**
* Get database name by path
* Get database name by device path
*
* <p>e.g., root.sg1 is a database and path = root.sg1.d1, return root.sg1
* <p>e.g., root.sg1 is a database and device path = root.sg1.d1, return root.sg1
*
* @param pathName only full path, cannot be path pattern
* @param deviceID only full device path, cannot be path pattern
* @return database in the given path
* @throws SemanticException no matched database
*/
@Override
public String getBelongedDatabase(String pathName) {
public String getBelongedDatabase(IDeviceID deviceID) {
for (String database : databases) {
if (PathUtils.isStartWith(pathName, database)) {
if (PathUtils.isStartWith(deviceID, database)) {
return database;
}
}
throw new SemanticException("No matched database. Please check the path " + pathName);
throw new SemanticException("No matched database. Please check the path " + deviceID);
}

@Override
public String getBelongedDatabase(PartialPath path) {
return getBelongedDatabase(path.getFullPath());
public String getBelongedDatabase(PartialPath devicePath) {
return getBelongedDatabase(devicePath.getIDeviceIDAsFullDevice());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;

import java.util.List;
Expand Down Expand Up @@ -60,14 +61,14 @@ Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
DeviceSchemaInfo searchDeviceSchemaInfo(PartialPath devicePath, List<String> measurements);

/**
* Get database name by path
* Get database name by device path
*
* <p>e.g., root.sg1 is a database and path = root.sg1.d1, return root.sg1
* <p>e.g., root.sg1 is a database and device path = root.sg1.d1, return root.sg1
*
* @param pathName only full path, cannot be path pattern
* @param pathName only full device path, cannot be path pattern
* @return database in the given path
*/
String getBelongedDatabase(String pathName);
String getBelongedDatabase(IDeviceID pathName);

String getBelongedDatabase(PartialPath path);

Expand Down
Loading
Loading