Skip to content

Commit

Permalink
self review
Browse files Browse the repository at this point in the history
Signed-off-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
OneSizeFitsQuorum committed Jun 12, 2024
1 parent 57e964a commit de95ccc
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,24 +534,14 @@ public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOEx
}

/**
* Get database name by path
* Get database name by device path
*
* <p>e.g., root.sg1 is a database and path = root.sg1.d1, return root.sg1
* <p>e.g., root.sg1 is a database and device path = root.sg1.d1, return root.sg1
*
* @param pathName only full path, cannot be path pattern
* @param deviceID only full device path, cannot be path pattern
* @return database in the given path
* @throws SemanticException no matched database
*/
@Override
public String getBelongedDatabase(String pathName) {
for (String database : databases) {
if (PathUtils.isStartWith(pathName, database)) {
return database;
}
}
throw new SemanticException("No matched database. Please check the path " + pathName);
}

@Override
public String getBelongedDatabase(IDeviceID deviceID) {
for (String database : databases) {
Expand All @@ -564,7 +554,7 @@ public String getBelongedDatabase(IDeviceID deviceID) {

@Override
public String getBelongedDatabase(PartialPath path) {
return getBelongedDatabase(path.getFullPath());
return getBelongedDatabase(path.getIDeviceID());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
DeviceSchemaInfo searchDeviceSchemaInfo(PartialPath devicePath, List<String> measurements);

/**
* Get database name by path
* Get database name by device path
*
* <p>e.g., root.sg1 is a database and path = root.sg1.d1, return root.sg1
* <p>e.g., root.sg1 is a database and device path = root.sg1.d1, return root.sg1
*
* @param pathName only full path, cannot be path pattern
* @param pathName only full device path, cannot be path pattern
* @return database in the given path
*/
String getBelongedDatabase(String pathName);

String getBelongedDatabase(IDeviceID pathName);

String getBelongedDatabase(PartialPath path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,8 @@ private void analyzeDeviceToSource(Analysis analysis, QueryStatement queryStatem
String deviceName = entry.getKey();
Set<Expression> sourceExpressionsUnderDevice = entry.getValue();
Set<String> queriedDevices = new HashSet<>();
// TODO: Change outputDeviceToQueriedDevicesMap to Map<IDeviceID, IDeviceID> to remove
// conversion
for (Expression expression : sourceExpressionsUnderDevice) {
queriedDevices.add(
ExpressionAnalyzer.getDeviceNameInSourceExpression(expression).toString());
Expand Down Expand Up @@ -1983,6 +1985,8 @@ private void analyzeDataPartition(
MPPQueryContext context) {
Set<IDeviceID> deviceSet = new HashSet<>();
if (queryStatement.isAlignByDevice()) {
// TODO: change OutputDeviceToQueriedDevicesMap to Map<IDeviceID, IDeviceID> to remove
// conversion
deviceSet =
analysis.getOutputDeviceToQueriedDevicesMap().values().stream()
.map(Factory.DEFAULT_FACTORY::create)
Expand Down Expand Up @@ -2010,11 +2014,11 @@ private DataPartition fetchDataPartitionByDevices(
CONFIG.getSeriesPartitionSlotNum());
}
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (IDeviceID deviceIDPath : deviceSet) {
for (IDeviceID deviceID : deviceSet) {
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(deviceIDPath, res.left, res.right.left, res.right.right);
new DataPartitionQueryParam(deviceID, res.left, res.right.left, res.right.right);
sgNameToQueryParamsMap
.computeIfAbsent(schemaTree.getBelongedDatabase(deviceIDPath), key -> new ArrayList<>())
.computeIfAbsent(schemaTree.getBelongedDatabase(deviceID), key -> new ArrayList<>())
.add(queryParam);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,11 @@ private static DataPartition fetchDataPartitionByDevices(
CONFIG.getSeriesPartitionSlotNum());
}
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (IDeviceID deviceIDPath : deviceSet) {
for (IDeviceID deviceID : deviceSet) {
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(deviceIDPath, res.left, res.right.left, res.right.right);
new DataPartitionQueryParam(deviceID, res.left, res.right.left, res.right.right);
sgNameToQueryParamsMap
.computeIfAbsent(schemaTree.getBelongedDatabase(deviceIDPath), key -> new ArrayList<>())
.computeIfAbsent(schemaTree.getBelongedDatabase(deviceID), key -> new ArrayList<>())
.add(queryParam);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public List<PlanNode> visitSingleDeviceView(
}

String device = node.getDevice();
// TODO: remove conversion for device and OutputDeviceToQueriedDevicesMap
List<TRegionReplicaSet> regionReplicaSets =
!analysis.useLogicalView()
? new ArrayList<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ private static DataPartition fetchDataPartitionByDevices(
}

Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (IDeviceID deviceIDPath : deviceSet) {
for (IDeviceID deviceID : deviceSet) {
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(deviceIDPath, res.left, res.right.left, res.right.right);
new DataPartitionQueryParam(deviceID, res.left, res.right.left, res.right.right);
sgNameToQueryParamsMap.computeIfAbsent(database, key -> new ArrayList<>()).add(queryParam);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ public class PartitionCacheTest {
}
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) {
// init each device
String deviceName = getDeviceName(storageGroupName, deviceNumber);
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName, deviceNumber));
TSeriesPartitionSlot seriesPartitionSlot =
new TSeriesPartitionSlot(
partitionExecutor.getSeriesPartitionSlot(
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)));
new TSeriesPartitionSlot(partitionExecutor.getSeriesPartitionSlot(deviceID));
// init schemaRegion of device
TConsensusGroupId schemaConsensusGroupId =
new TConsensusGroupId(
Expand Down Expand Up @@ -290,12 +289,12 @@ public void testSchemaRegionCache() {
storageGroupNumber++) {
String storageGroupName = getStorageGroupName(storageGroupNumber);
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) {
IDeviceID deviceName =
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName, deviceNumber));
TSeriesPartitionSlot seriesPartitionSlot =
partitionExecutor.getSeriesPartitionSlot(deviceName);
partitionExecutor.getSeriesPartitionSlot(deviceID);
Map<String, List<IDeviceID>> searchMap = new HashMap<>();
searchMap.put(storageGroupName, Collections.singletonList(deviceName));
searchMap.put(storageGroupName, Collections.singletonList(deviceID));
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(searchMap);
assertNotNull(schemaPartition);
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> result =
Expand All @@ -311,10 +310,10 @@ public void testSchemaRegionCache() {
List<String> missedStorageGroupNames = Arrays.asList("root.sg", "root.*");
for (String missedStorageGroupName : missedStorageGroupNames) {
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) {
IDeviceID deviceName =
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(missedStorageGroupName, deviceNumber));
Map<String, List<IDeviceID>> searchMap = new HashMap<>();
searchMap.put(missedStorageGroupName, Collections.singletonList(deviceName));
searchMap.put(missedStorageGroupName, Collections.singletonList(deviceID));
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(searchMap);
assertNull(schemaPartition);
}
Expand All @@ -327,10 +326,10 @@ public void testSchemaRegionCache() {
for (int deviceNumber = DEVICE_PER_STORAGE_GROUP;
deviceNumber < 2 * DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
IDeviceID deviceName =
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName, deviceNumber));
Map<String, List<IDeviceID>> searchMap = new HashMap<>();
searchMap.put(storageGroupName, Collections.singletonList(deviceName));
searchMap.put(storageGroupName, Collections.singletonList(deviceID));
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(searchMap);
assertNull(schemaPartition);
}
Expand All @@ -342,10 +341,10 @@ public void testSchemaRegionCache() {
storageGroupNumber++) {
String storageGroupName = getStorageGroupName(storageGroupNumber);
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) {
IDeviceID deviceName =
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName, deviceNumber));
Map<String, List<IDeviceID>> searchMap = new HashMap<>();
searchMap.put(storageGroupName, Collections.singletonList(deviceName));
searchMap.put(storageGroupName, Collections.singletonList(deviceID));
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(searchMap);
assertNull(schemaPartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ public static Analysis constructAnalysis() {
schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);

schemaPartitionMap.put("root.sg", schemaRegionMap);
schemaPartition.setSchemaPartitionMap(schemaPartitionMap);

Expand Down

0 comments on commit de95ccc

Please sign in to comment.