Skip to content

Commit

Permalink
Merge branch 'iotdb-rc133' into enhanceAutoRepair-rc133
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwenwei committed Sep 26, 2024
2 parents 4cb5325 + f27a249 commit 91e88e3
Show file tree
Hide file tree
Showing 71 changed files with 916 additions and 753 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,14 @@ public void testBigDateTime() {
e.printStackTrace();
fail();
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("insert into root.sg.d1(time,s2) values (16182830055860000000, 8.76);");
fail();
} catch (SQLException e) {
Assert.assertTrue(
e.getMessage()
.contains("please check whether the timestamp 16182830055860000000 is correct."));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -470,7 +471,7 @@ public void testAlterPipeSourceAndProcessor() {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(sql);
} catch (SQLException e) {
} catch (final SQLException e) {
fail(e.getMessage());
}

Expand All @@ -491,12 +492,13 @@ public void testAlterPipeSourceAndProcessor() {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select * from root.db.**", "Time,root.db.d1.at1,", expectedResSet);

// Alter pipe (modify 'source.path' and 'processor.tumbling-time.interval-seconds')
// Alter pipe (modify 'source.path', 'source.inclusion' and
// 'processor.tumbling-time.interval-seconds')
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
"alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**') modify processor ('processor.tumbling-time.interval-seconds'='2')");
} catch (SQLException e) {
"alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**', 'source.inclusion'='all') modify processor ('processor.tumbling-time.interval-seconds'='2')");
} catch (final SQLException e) {
fail(e.getMessage());
}

Expand Down Expand Up @@ -527,5 +529,15 @@ public void testAlterPipeSourceAndProcessor() {
"select * from root.db.** where time > 10000",
"Time,root.db.d1.at1,root.db.d2.at1,",
expectedResSet);

// Create database on sender
if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, "create timeSeries root.db.d2.at2 int32")) {
fail();
}

// Check database on receiver
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeSeries", "count(timeseries),", Collections.singleton("3,"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public static void setSourceFullPath(final String sourceFullPath) {
}

public static int getSourceFullPathLength() {
return ImportTsFileScanTool.sourceFullPath.length();
return new File(sourceFullPath).isDirectory()
? ImportTsFileScanTool.sourceFullPath.length()
: new File(ImportTsFileScanTool.sourceFullPath).getParent().length();
}

public static int getTsFileQueueSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,10 @@ private void formulateFolders() {
pipeReceiverFileDir = addHomeDir(pipeReceiverFileDir);
}

private String addHomeDir(String dir) {
String homeDir = System.getProperty(ConfigNodeConstant.CONFIGNODE_HOME, null);
if (!new File(dir).isAbsolute() && homeDir != null && homeDir.length() > 0) {
if (!homeDir.endsWith(File.separator)) {
dir = homeDir + File.separatorChar + dir;
} else {
dir = homeDir + dir;
}
public static String addHomeDir(String dir) {
final String homeDir = System.getProperty(ConfigNodeConstant.CONFIGNODE_HOME, null);
if (!new File(dir).isAbsolute() && homeDir != null && !homeDir.isEmpty()) {
dir = !homeDir.endsWith(File.separator) ? homeDir + File.separatorChar + dir : homeDir + dir;
}
return dir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
package org.apache.iotdb.confignode.consensus.request;

import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan;
import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan;
import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.UpdateAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
Expand All @@ -43,6 +49,10 @@
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan;
import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan;
import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
Expand Down Expand Up @@ -162,6 +172,18 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case RemoveDataNode:
plan = new RemoveDataNodePlan();
break;
case RegisterAINode:
plan = new RegisterAINodePlan();
break;
case RemoveAINode:
plan = new RemoveAINodePlan();
break;
case GetAINodeConfiguration:
plan = new GetAINodeConfigurationPlan();
break;
case UpdateAINodeConfiguration:
plan = new UpdateAINodePlan();
break;
case CreateDatabase:
plan = new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase);
break;
Expand Down Expand Up @@ -400,6 +422,24 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case UPDATE_CQ_LAST_EXEC_TIME:
plan = new UpdateCQLastExecTimePlan();
break;
case CreateModel:
plan = new CreateModelPlan();
break;
case UpdateModelInfo:
plan = new UpdateModelInfoPlan();
break;
case DropModel:
plan = new DropModelPlan();
break;
case ShowModel:
plan = new ShowModelPlan();
break;
case DropModelInNode:
plan = new DropModelInNodePlan();
break;
case GetModelInfo:
plan = new GetModelInfoPlan();
break;
case CreatePipePlugin:
plan = new CreatePipePluginPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ protected ConfigPhysicalReadPlan(final ConfigPhysicalPlanType type) {

@Override
protected void serializeImpl(final DataOutputStream stream) throws IOException {
throw new UnsupportedOperationException("Read request does not need to be serialized.");
// Read request does not need to be serialized
}

@Override
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
throw new UnsupportedOperationException("Read request does not need to be deserialized.");
// Read request does not need to be deserialized
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class GetAINodeConfigurationPlan extends ConfigPhysicalReadPlan {

// if aiNodeId is set to -1, return all AINode configurations.
private final int aiNodeId;
private int aiNodeId;

public GetAINodeConfigurationPlan() {
super(ConfigPhysicalPlanType.GetAINodeConfiguration);
}

public GetAINodeConfigurationPlan(final int aiNodeId) {
super(ConfigPhysicalPlanType.GetAINodeConfiguration);
Expand All @@ -36,6 +44,17 @@ public int getAiNodeId() {
return aiNodeId;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
stream.writeInt(aiNodeId);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
this.aiNodeId = buffer.getInt();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;

import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class GetModelInfoPlan extends ConfigPhysicalReadPlan {

private final String modelId;
private String modelId;

public GetModelInfoPlan() {
super(ConfigPhysicalPlanType.GetModelInfo);
}

public GetModelInfoPlan(final TGetModelInfoReq getModelInfoReq) {
super(ConfigPhysicalPlanType.GetModelInfo);
Expand All @@ -38,6 +47,17 @@ public String getModelId() {
return modelId;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(modelId, stream);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
this.modelId = ReadWriteIOUtils.readString(buffer);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,21 @@
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;

import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class ShowModelPlan extends ConfigPhysicalReadPlan {

private String modelName;

public ShowModelPlan() {
super(ConfigPhysicalPlanType.ShowModel);
}

public ShowModelPlan(final TShowModelReq showModelReq) {
super(ConfigPhysicalPlanType.ShowModel);
if (showModelReq.isSetModelId()) {
Expand All @@ -44,6 +53,21 @@ public String getModelName() {
return modelName;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(modelName != null, stream);
ReadWriteIOUtils.write(modelName, stream);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
boolean isSetModelId = ReadWriteIOUtils.readBool(buffer);
if (isSetModelId) {
this.modelName = ReadWriteIOUtils.readString(buffer);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeConfigNodeRuntimeAgent implements IService {
Expand Down Expand Up @@ -90,12 +89,13 @@ public ConfigRegionListeningQueue listener() {
return regionListener.listener();
}

public void increaseListenerReference(PipeParameters parameters) throws IllegalPathException {
public void increaseListenerReference(final PipeParameters parameters)
throws IllegalPathException {
regionListener.increaseReference(parameters);
}

public void decreaseListenerReference(PipeParameters parameters)
throws IllegalPathException, IOException {
public void decreaseListenerReference(final PipeParameters parameters)
throws IllegalPathException {
regionListener.decreaseReference(parameters);
}

Expand All @@ -120,15 +120,16 @@ public boolean isLeaderReady() {

//////////////////////////// Runtime Exception Handlers ////////////////////////////

public void report(EnrichedEvent event, PipeRuntimeException pipeRuntimeException) {
public void report(final EnrichedEvent event, final PipeRuntimeException pipeRuntimeException) {
if (event.getPipeTaskMeta() != null) {
report(event.getPipeTaskMeta(), pipeRuntimeException);
} else {
LOGGER.warn("Attempt to report pipe exception to a null PipeTaskMeta.", pipeRuntimeException);
}
}

private void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
private void report(
final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) {
LOGGER.warn(
"Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}",
pipeTaskMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningQueue;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeConfigRegionListener {
Expand All @@ -38,7 +37,7 @@ public synchronized ConfigRegionListeningQueue listener() {
return listeningQueue;
}

public synchronized void increaseReference(PipeParameters parameters)
public synchronized void increaseReference(final PipeParameters parameters)
throws IllegalPathException {
if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) {
listeningQueueReferenceCount++;
Expand All @@ -48,8 +47,8 @@ public synchronized void increaseReference(PipeParameters parameters)
}
}

public synchronized void decreaseReference(PipeParameters parameters)
throws IllegalPathException, IOException {
public synchronized void decreaseReference(final PipeParameters parameters)
throws IllegalPathException {
if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) {
listeningQueueReferenceCount--;
if (listeningQueueReferenceCount == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.manager.pipe.connector.client.IoTDBConfigNodeSyncClientManager;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotPieceReq;
Expand Down Expand Up @@ -68,7 +69,7 @@ protected IoTDBSyncClientManager constructClient(
return new IoTDBConfigNodeSyncClientManager(
nodeUrls,
useSSL,
trustStorePath,
Objects.nonNull(trustStorePath) ? ConfigNodeConfig.addHomeDir(trustStorePath) : null,
trustStorePwd,
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
Expand Down
Loading

0 comments on commit 91e88e3

Please sign in to comment.