Skip to content

Commit

Permalink
IGNITE-23090 Start transaction on the first query
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Oct 5, 2024
1 parent f1bb463 commit d6cfa04
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.sql.SQLException;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcThinFeature;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -564,7 +565,7 @@ public void setPartitionAwarenessPartitionDistributionsCacheSize(
/**
* @return Transaction concurrency value.
*/
public String getTransactionConcurrency();
public TransactionConcurrency getTransactionConcurrency();

/**
* Sets transaction concurrency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
}

/** {@inheritDoc} */
@Override public String getTransactionConcurrency() {
return transactionConcurrency.value();
@Override public TransactionConcurrency getTransactionConcurrency() {
return TransactionConcurrency.valueOf(transactionConcurrency.value());
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -109,10 +110,9 @@
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultWithIo;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcSetTxParametersRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcTxEndRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcTxStartRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcTxStartResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcUpdateBinarySchemaResult;
import org.apache.ignite.internal.sql.command.SqlCommand;
import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
Expand All @@ -128,7 +128,6 @@
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -197,9 +196,6 @@ public class JdbcThinConnection implements Connection {
/** Current transaction isolation. */
private int txIsolation;

/** Transaction concurrency */
private final TransactionConcurrency txConcurrency;

/** Auto-commit flag. */
private boolean autoCommit;

Expand Down Expand Up @@ -311,7 +307,8 @@ public JdbcThinConnection(ConnectionProperties connProps) throws SQLException {

holdability = txSupportedOnServer() ? CLOSE_CURSORS_AT_COMMIT : HOLD_CURSORS_OVER_COMMIT;
txIsolation = defaultTransactionIsolation();
txConcurrency = TransactionConcurrency.valueOf(connProps.getTransactionConcurrency());

updateTransactionParameters();
}

/** Create new binary context. */
Expand Down Expand Up @@ -402,6 +399,27 @@ void endTransactionIfExists(boolean commit) throws SQLException {
txCtx = null;
}

/** Updates transaction parameters on all known servers. */
private void updateTransactionParameters() throws SQLException {
if (!txSupportedOnServer())
return;

//TODO: FIX case when txIsolation == 0;
JdbcSetTxParametersRequest req = new JdbcSetTxParametersRequest(
connProps.getTransactionConcurrency(),
txIsolation == TRANSACTION_NONE ? null : isolation(txIsolation),
connProps.getTransactionTimeout(),
connProps.getTransactionLabel()
);

if (partitionAwareness) {
for (JdbcThinTcpIo io : ios.values())
sendRequest(req, null, io);
}
else
sendRequest(req, null, singleIo);
}

/**
* @param sql Statement.
* @param cmd Parsed form of {@code sql}.
Expand Down Expand Up @@ -715,6 +733,8 @@ private void checkCursorOptions(int resSetType, int resSetConcurrency, int resSe
}

txIsolation = level;

updateTransactionParameters();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -991,25 +1011,28 @@ private void checkCursorOptions(int resSetType, int resSetConcurrency, int resSe
return netTimeout;
}

/**
* Opens transaction if required.
* @throws SQLException If failed.
*/
TxContext transactionContext() throws SQLException {
assert txEnabledForConnection();

// Transaction in autoCommit mode must be closed on ResultSet close.
if (isTxOpen() && !autoCommit)
return txCtx;
/** Store transaction context. */
void txContext(JdbcThinTcpIo txIo, int txId) {
if (!txEnabledForConnection() || autoCommit)
return;

assert txCtx == null || autoCommit;
assert txId != NO_TX;

TxContext txCtx0 = new TxContext(txIsolation, txConcurrency);
// Check same context returned from server when transaction exists, already.
if (txCtx != null && (txCtx.txId() != txId || !Objects.equals(txCtx.txIo().nodeId(), txIo.nodeId()))) {
throw new IllegalStateException("Nested transactions not supported [" +
"txCtx.txId=" + txId +
", txCtx.nodeId=" + txCtx.txIo.nodeId() +
", new.txId=" + txId +
", new.nodeId=" + txIo.nodeId() + ']');
}

if (!autoCommit)
txCtx = txCtx0;
txCtx = new TxContext(txIo, txId);
}

return txCtx0;
/** @return Current transaction context. */
@Nullable TxContext txContext() {
return txCtx;
}

/**
Expand Down Expand Up @@ -2586,10 +2609,10 @@ boolean handleResult(long reqId, JdbcResult res) {
/** Transaction context. */
public class TxContext {
/** IO to transaction coordinator. */
JdbcThinTcpIo txIo;
final JdbcThinTcpIo txIo;

/** Transaction id. */
int txId;
final int txId;

/** Closed flag. */
boolean closed;
Expand All @@ -2598,16 +2621,9 @@ public class TxContext {
private final Set<JdbcThinStatement> stmts = Collections.newSetFromMap(new IdentityHashMap<>());

/** */
public TxContext(int jdbcIsolation, TransactionConcurrency transactionConcurrency) throws SQLException {
JdbcResultWithIo res = sendRequest(new JdbcTxStartRequest(
transactionConcurrency,
isolation(jdbcIsolation),
connProps.getTransactionTimeout(),
connProps.getTransactionLabel()
));

txIo = res.cliIo();
txId = ((JdbcTxStartResult)res.response()).txId();
public TxContext(JdbcThinTcpIo txIo, int txId) {
this.txIo = txIo;
this.txId = txId;
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,78 +231,76 @@ protected void execute0(JdbcStatementType stmtType, String sql, List<Object> arg
}

boolean autoCommit = conn.getAutoCommit();
boolean txEnabled = conn.txEnabledForConnection();

TxContext txCtx = txEnabled ? conn.transactionContext() : null;
TxContext rsetCtx = autoCommit ? txCtx : null; // Commit transaction on ResultSet#close only for autoCommit mode.
TxContext txCtx = conn.txContext();

if (txCtx != null)
txCtx.track(this);

try {
JdbcQueryExecuteRequest req = new JdbcQueryExecuteRequest(stmtType, schema, pageSize,
maxRows, autoCommit, explicitTimeout, sql, args == null ? null : args.toArray(new Object[args.size()]),
txEnabled ? txCtx.txId() : NO_TX);
JdbcQueryExecuteRequest req = new JdbcQueryExecuteRequest(
stmtType,
schema,
pageSize,
maxRows,
autoCommit,
explicitTimeout,
sql,
args == null ? null : args.toArray(new Object[args.size()]),
txCtx == null ? NO_TX : txCtx.txId
);

JdbcResultWithIo resWithIo = conn.sendRequest(req, this, null);
JdbcResultWithIo resWithIo = conn.sendRequest(req, this, null);

JdbcResult res0 = resWithIo.response();
JdbcResult res0 = resWithIo.response();

JdbcThinTcpIo stickyIo = resWithIo.cliIo();
JdbcThinTcpIo stickyIo = resWithIo.cliIo();

assert res0 != null;
assert res0 != null;

if (res0 instanceof JdbcBulkLoadAckResult)
res0 = sendFile((JdbcBulkLoadAckResult)res0, stickyIo);
if (res0 instanceof JdbcBulkLoadAckResult)
res0 = sendFile((JdbcBulkLoadAckResult)res0, stickyIo);

if (res0 instanceof JdbcQueryExecuteResult) {
JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0;
if (res0 instanceof JdbcQueryExecuteResult) {
JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0;

resultSets = Collections.singletonList(new JdbcThinResultSet(this, res.cursorId(), pageSize,
res.last(), res.items(), res.isQuery(), conn.autoCloseServerCursor(), res.updateCount(),
closeOnCompletion, stickyIo));
}
else if (res0 instanceof JdbcQueryExecuteMultipleStatementsResult) {
JdbcQueryExecuteMultipleStatementsResult res = (JdbcQueryExecuteMultipleStatementsResult)res0;
conn.txContext(stickyIo, res.txId());

resultSets = Collections.singletonList(new JdbcThinResultSet(this, res.cursorId(), pageSize,
res.last(), res.items(), res.isQuery(), conn.autoCloseServerCursor(), res.updateCount(),
closeOnCompletion, stickyIo));
}
else if (res0 instanceof JdbcQueryExecuteMultipleStatementsResult) {
JdbcQueryExecuteMultipleStatementsResult res = (JdbcQueryExecuteMultipleStatementsResult)res0;

conn.txContext(stickyIo, res.txId());

List<JdbcResultInfo> resInfos = res.results();

List<JdbcResultInfo> resInfos = res.results();
resultSets = new ArrayList<>(resInfos.size());

resultSets = new ArrayList<>(resInfos.size());
boolean firstRes = true;

boolean firstRes = true;
for (JdbcResultInfo rsInfo : resInfos) {
if (!rsInfo.isQuery())
resultSets.add(resultSetForUpdate(rsInfo.updateCount()));
else {
if (firstRes) {
firstRes = false;

for (JdbcResultInfo rsInfo : resInfos) {
if (!rsInfo.isQuery())
resultSets.add(resultSetForUpdate(rsInfo.updateCount()));
resultSets.add(new JdbcThinResultSet(this, rsInfo.cursorId(), pageSize, res.isLast(),
res.items(), true, conn.autoCloseServerCursor(), -1, closeOnCompletion,
stickyIo));
}
else {
if (firstRes) {
firstRes = false;

resultSets.add(new JdbcThinResultSet(this, rsInfo.cursorId(), pageSize, res.isLast(),
res.items(), true, conn.autoCloseServerCursor(), -1, closeOnCompletion,
stickyIo));
}
else {
resultSets.add(new JdbcThinResultSet(this, rsInfo.cursorId(), pageSize, false,
null, true, conn.autoCloseServerCursor(), -1, closeOnCompletion,
stickyIo));
}
resultSets.add(new JdbcThinResultSet(this, rsInfo.cursorId(), pageSize, false,
null, true, conn.autoCloseServerCursor(), -1, closeOnCompletion,
stickyIo));
}
}
}
else
throw new SQLException("Unexpected result [res=" + res0 + ']');

if (autoCommit && txEnabled)
txCtx.end(true);
}
catch (Exception e) {
// Rollback in case of error.
if (autoCommit && txEnabled)
txCtx.end(false);

throw e;
}
else
throw new SQLException("Unexpected result [res=" + res0 + ']');

assert !resultSets.isEmpty() : "At least one results set is expected";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
* Base connection context.
*/
public abstract class ClientListenerAbstractConnectionContext implements ClientListenerConnectionContext {
/** */
public static final int NO_TX = 0;

/** Kernal context. */
protected final GridKernalContext ctx;

Expand Down Expand Up @@ -162,7 +165,7 @@ public String clientDescriptor() {
public int nextTxId() {
int txId = txIdSeq.incrementAndGet();

return txId == 0 ? txIdSeq.incrementAndGet() : txId;
return txId == NO_TX ? txIdSeq.incrementAndGet() : txId;
}

/**
Expand Down
Loading

0 comments on commit d6cfa04

Please sign in to comment.