Skip to content

Commit

Permalink
Pipe: Fix PipeWriteBackSink using toTPipeTransferReq causing NPE due …
Browse files Browse the repository at this point in the history
…to uninitialized buffer
  • Loading branch information
luoluoyuyu committed Jan 10, 2025
1 parent 7f0ac97 commit e716ee9
Showing 1 changed file with 154 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.iotdb.db.pipe.connector.protocol.writeback;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
Expand All @@ -30,7 +33,18 @@
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
Expand All @@ -42,11 +56,20 @@
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import static org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;

public class WriteBackConnector implements PipeConnector {

Expand All @@ -60,9 +83,13 @@ public class WriteBackConnector implements PipeConnector {

private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;

private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser();

private static final Set<String> ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet();

@Override
public void validate(final PipeParameterValidator validator) throws Exception {
// Do nothing
final PipeParameters parameters = validator.getParameters();
}

@Override
Expand All @@ -78,6 +105,11 @@ public void customize(
environment.getPipeName(),
environment.getCreationTime(),
environment.getRegionId()));
SESSION_MANAGER.supplySession(
session,
AuthorityChecker.SUPER_USER,
ZoneId.systemDefault(),
IoTDBConstant.ClientVersion.V_1_0);
SESSION_MANAGER.registerSession(session);
}

Expand Down Expand Up @@ -137,16 +169,22 @@ private void doTransfer(
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;

final InsertBaseStatement insertBaseStatement;
if (Objects.nonNull(insertNode)) {
insertBaseStatement =
PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getByteBuffer(), dataBaseName)
.constructStatement();
} else {
insertBaseStatement =
PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, dataBaseName)
.constructStatement();
}

final TSStatus status =
PipeDataNodeAgent.receiver()
.thrift()
.receive(
Objects.isNull(insertNode)
? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getByteBuffer(), dataBaseName)
: PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
insertNode, dataBaseName))
.getStatus();
insertBaseStatement.isWriteToTable()
? executeStatementForTableModel(insertBaseStatement, dataBaseName)
: executeStatementForTreeModel(insertBaseStatement);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
Expand All @@ -170,17 +208,22 @@ private void doTransferWrapper(final PipeRawTabletInsertionEvent pipeRawTabletIn

private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
throws PipeException {
final String dataBaseName =
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;

final InsertTabletStatement insertTabletStatement =
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned(),
dataBaseName)
.constructStatement();

final TSStatus status =
PipeDataNodeAgent.receiver()
.thrift()
.receive(
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER))
.getStatus();
insertTabletStatement.isWriteToTable()
? executeStatementForTableModel(insertTabletStatement, dataBaseName)
: executeStatementForTreeModel(insertTabletStatement);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
Expand All @@ -201,4 +244,94 @@ public void close() throws Exception {
}
SESSION_MANAGER.removeCurrSession();
}

private TSStatus executeStatementForTableModel(Statement statement, String dataBaseName) {
try {
autoCreateDatabaseIfNecessary(dataBaseName);

return Coordinator.getInstance()
.executeForTableModel(
new PipeEnrichedStatement(statement),
RELATIONAL_SQL_PARSER,
SESSION_MANAGER.getCurrSession(),
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfoOfPipeReceiver(
SESSION_MANAGER.getCurrSession(), dataBaseName),
"",
LocalExecutionPlanner.getInstance().metadata,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
.status;
} catch (final Exception e) {
ALREADY_CREATED_DATABASES.remove(dataBaseName);

final Throwable rootCause = getRootCause(e);
if (rootCause.getMessage() != null
&& rootCause
.getMessage()
.toLowerCase(Locale.ENGLISH)
.contains(DATABASE_NOT_SET.toLowerCase(Locale.ENGLISH))) {
autoCreateDatabaseIfNecessary(dataBaseName);

// Retry after creating the database
return Coordinator.getInstance()
.executeForTableModel(
new PipeEnrichedStatement(statement),
RELATIONAL_SQL_PARSER,
SESSION_MANAGER.getCurrSession(),
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfoOfPipeReceiver(
SESSION_MANAGER.getCurrSession(), dataBaseName),
"",
LocalExecutionPlanner.getInstance().metadata,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
.status;
}

// If the exception is not caused by database not set, throw it directly
throw e;
}
}

private void autoCreateDatabaseIfNecessary(final String database) {
if (ALREADY_CREATED_DATABASES.contains(database)) {
return;
}

final TDatabaseSchema schema = new TDatabaseSchema(new TDatabaseSchema(database));
schema.setIsTableModel(true);

final CreateDBTask task = new CreateDBTask(schema, true);
try {
final ListenableFuture<ConfigTaskResult> future =
task.execute(ClusterConfigTaskExecutor.getInstance());
final ConfigTaskResult result = future.get();
if (result.getStatusCode().getStatusCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Auto create database failed: %s, status code: %s",
database, result.getStatusCode()));
}
} catch (final ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new PipeException("Auto create database failed because: " + e.getMessage());
}

ALREADY_CREATED_DATABASES.add(database);
}

private TSStatus executeStatementForTreeModel(final Statement statement) {
return Coordinator.getInstance()
.executeForTreeModel(
new PipeEnrichedStatement(statement),
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
false)
.status;
}
}

0 comments on commit e716ee9

Please sign in to comment.