Skip to content

Commit

Permalink
update parititionCache
Browse files Browse the repository at this point in the history
Signed-off-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
OneSizeFitsQuorum committed Jun 12, 2024
1 parent 16cb5dc commit 57e964a
Show file tree
Hide file tree
Showing 40 changed files with 340 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -691,7 +692,7 @@ private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, Partia
return new ArrayList<>();
}
return Collections.singletonList(
getPartitionManager().getSeriesPartitionSlot(innerPath.getIDeviceID().toString()));
getPartitionManager().getSeriesPartitionSlot(innerPath.getIDeviceID()));
}

@Override
Expand Down Expand Up @@ -760,17 +761,17 @@ public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patt
return resp.setStatus(status);
}

List<String> devicePaths = patternTree.getAllDevicePatterns();
List<IDeviceID> devicePaths = patternTree.getAllDevicePatterns();
List<String> databases = getClusterSchemaManager().getDatabaseNames();

// Build GetOrCreateSchemaPartitionPlan
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
for (String devicePath : devicePaths) {
for (IDeviceID deviceID : devicePaths) {
for (String database : databases) {
if (PathUtils.isStartWith(devicePath, database)) {
if (PathUtils.isStartWith(deviceID, database)) {
partitionSlotsMap
.computeIfAbsent(database, key -> new ArrayList<>())
.add(getPartitionManager().getSeriesPartitionSlot(devicePath));
.add(getPartitionManager().getSeriesPartitionSlot(deviceID));
break;
}
}
Expand All @@ -790,11 +791,11 @@ public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patt
}

private void printNewCreatedSchemaPartition(
List<String> devicePaths, TSchemaPartitionTableResp resp) {
List<IDeviceID> deviceIDS, TSchemaPartitionTableResp resp) {
final String lineSeparator = System.lineSeparator();
StringBuilder devicePathString = new StringBuilder("{");
for (String devicePath : devicePaths) {
devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
for (IDeviceID deviceID : deviceIDS) {
devicePathString.append(lineSeparator).append("\t").append(deviceID).append(",");
}
devicePathString.append(lineSeparator).append("}");

Expand Down Expand Up @@ -865,8 +866,8 @@ private void printNodePathsPartition(
final String lineSeparator = System.lineSeparator();

StringBuilder devicePathString = new StringBuilder("{");
for (String devicePath : scope.getAllDevicePatterns()) {
devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
for (IDeviceID deviceID : scope.getAllDevicePatterns()) {
devicePathString.append(lineSeparator).append("\t").append(deviceID).append(",");
}
devicePathString.append(lineSeparator).append("}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,11 +1005,11 @@ public boolean isDatabasePreDeleted(String database) {
/**
* Get TSeriesPartitionSlot.
*
* @param devicePath Full path ending with device name
* @param deviceID
* @return SeriesPartitionSlot
*/
public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) {
return executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath));
public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) {
return executor.getSeriesPartitionSlot(deviceID);
}

public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -41,9 +44,9 @@ public class DeviceGroupHashExecutorManualTest {
private static final String chars =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-";

private List<String> genBatchDevices() {
private List<IDeviceID> genBatchDevices() {
Random random = new Random();
List<String> devices = new ArrayList<>();
List<IDeviceID> devices = new ArrayList<>();
int fatherLength = random.nextInt(10) + 10;
int deviceLength = random.nextInt(5) + 5;

Expand All @@ -56,7 +59,7 @@ private List<String> genBatchDevices() {
for (int k = 0; k < deviceLength; k++) {
curDevice.append(chars.charAt(random.nextInt(chars.length())));
}
devices.add(curDevice.toString());
devices.add(Factory.DEFAULT_FACTORY.create(curDevice.toString()));
}
return devices;
}
Expand All @@ -68,9 +71,9 @@ public void GeneralIndexTest() throws IOException {

long totalTime = 0;
for (int i = 0; i < batchCount; i++) {
List<String> devices = genBatchDevices();
List<IDeviceID> devices = genBatchDevices();
totalTime -= System.currentTimeMillis();
for (String device : devices) {
for (IDeviceID device : devices) {
bucket[manager.getSeriesPartitionSlot(device).getSlotId()] += 1;
}
totalTime += System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.block.TsBlock;
Expand Down Expand Up @@ -673,7 +674,7 @@ private TSExecuteStatementResp executeAggregationQueryInternal(

private List<TsBlock> executeGroupByQueryInternal(
SessionInfo sessionInfo,
String device,
IDeviceID deviceID,
String measurement,
TSDataType dataType,
boolean isAligned,
Expand Down Expand Up @@ -736,7 +737,7 @@ private List<TsBlock> executeGroupByQueryInternal(
if (isAligned) {
path =
new AlignedFullPath(
Factory.DEFAULT_FACTORY.create(device),
deviceID,
Collections.singletonList(measurement),
Collections.singletonList(measurementSchema));
operator =
Expand All @@ -754,7 +755,7 @@ private List<TsBlock> executeGroupByQueryInternal(
|| (!TAggregationType.LAST_VALUE.equals(aggregationType)
&& !TAggregationType.FIRST_VALUE.equals(aggregationType)));
} else {
path = new NonAlignedFullPath(Factory.DEFAULT_FACTORY.create(device), measurementSchema);
path = new NonAlignedFullPath(deviceID, measurementSchema);
operator =
new SeriesAggregationScanOperator(
planNodeId,
Expand Down Expand Up @@ -827,28 +828,30 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
Throwable t = null;
try {
String db;
String deviceId;
String device;
PartialPath devicePath;

queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);

if (req.isLegalPathNodes()) {
db = req.db;
deviceId = req.deviceId;
devicePath = new PartialPath(deviceId.split("\\."));
device = req.deviceId;
devicePath = new PartialPath(device.split("\\."));
} else {
db = new PartialPath(req.db).getFullPath();
devicePath = new PartialPath(req.deviceId);
deviceId = devicePath.getFullPath();
device = devicePath.getFullPath();
}

IDeviceID deviceID = Factory.DEFAULT_FACTORY.create(device);

DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, true);
new DataPartitionQueryParam(deviceID, Collections.emptyList(), true, true);
DataPartition dataPartition =
partitionFetcher.getDataPartitionWithUnclosedTimeRange(
Collections.singletonMap(db, Collections.singletonList(queryParam)));
List<TRegionReplicaSet> regionReplicaSets =
dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null);
dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceID, null);

// no valid DataRegion
if (regionReplicaSets.isEmpty()
Expand Down Expand Up @@ -1041,7 +1044,7 @@ public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryInt
String[] splits = Strings.split(req.getDevice(), "\\.");
database = String.format("%s.%s", splits[0], splits[1]);
}
String deviceId = req.getDevice();
IDeviceID deviceId = Factory.DEFAULT_FACTORY.create(req.getDevice());
String measurementId = req.getMeasurement();
TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
Expand Down Expand Up @@ -551,6 +552,16 @@ public String getBelongedDatabase(String pathName) {
throw new SemanticException("No matched database. Please check the path " + pathName);
}

@Override
public String getBelongedDatabase(IDeviceID deviceID) {
for (String database : databases) {
if (PathUtils.isStartWith(deviceID, database)) {
return database;
}
}
throw new SemanticException("No matched database. Please check the path " + deviceID);
}

@Override
public String getBelongedDatabase(PartialPath path) {
return getBelongedDatabase(path.getFullPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;

import java.util.List;
Expand Down Expand Up @@ -69,6 +70,8 @@ Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
*/
String getBelongedDatabase(String pathName);

String getBelongedDatabase(IDeviceID pathName);

String getBelongedDatabase(PartialPath path);

Set<String> getDatabases();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
Expand Down Expand Up @@ -334,19 +335,19 @@ public Analysis() {

public List<TRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
return dataPartition.getDataRegionReplicaSetWithTimeFilter(
seriesPath.getIDeviceID().toString(), timefilter);
seriesPath.getIDeviceID(), timefilter);
}

public List<TRegionReplicaSet> getPartitionInfoByDevice(
PartialPath devicePath, Filter timefilter) {
return dataPartition.getDataRegionReplicaSetWithTimeFilter(
devicePath.getFullPath(), timefilter);
devicePath.getIDeviceID(), timefilter);
}

public TRegionReplicaSet getPartitionInfo(
PartialPath seriesPath, TTimePartitionSlot tTimePartitionSlot) {
return dataPartition
.getDataRegionReplicaSet(seriesPath.getIDeviceID().toString(), tTimePartitionSlot)
.getDataRegionReplicaSet(seriesPath.getIDeviceID(), tTimePartitionSlot)
.get(0);
}

Expand All @@ -356,11 +357,11 @@ public TRegionReplicaSet getPartitionInfo(
*/
public List<List<TTimePartitionSlot>> getTimePartitionRange(
PartialPath seriesPath, Filter timefilter) {
return dataPartition.getTimePartitionRange(seriesPath.getIDeviceID().toString(), timefilter);
return dataPartition.getTimePartitionRange(seriesPath.getIDeviceID(), timefilter);
}

public List<TRegionReplicaSet> getPartitionInfo(String deviceName, Filter globalTimeFilter) {
return dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceName, globalTimeFilter);
public List<TRegionReplicaSet> getPartitionInfo(IDeviceID deviceID, Filter globalTimeFilter) {
return dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceID, globalTimeFilter);
}

public QueryStatement getQueryStatement() {
Expand Down
Loading

0 comments on commit 57e964a

Please sign in to comment.