Skip to content

Commit

Permalink
IGNITE-23156 Failed test added
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Sep 30, 2024
1 parent b1618a3 commit 07efd0e
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 39 deletions.
6 changes: 6 additions & 0 deletions modules/calcite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>ignite-spring</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.Predicate;
Expand Down Expand Up @@ -119,7 +121,10 @@ public enum ExecutorType {
THIN_VIA_QUERY,

/** */
THIN_JDBC
THIN_JDBC,

/** */
JDBC
}

/** */
Expand Down Expand Up @@ -198,23 +203,10 @@ public enum ModifyApi {
private static ClientConfiguration thinCliCfg;

/** */
private ThreadLocal<Connection> jdbcThinConn = ThreadLocal.withInitial(() -> {
try {
String addrs = partitionAwareness
? Ignition.allGrids().stream()
.filter(n -> !n.configuration().isClientMode())
.map(n -> "127.0.0.1:" + ((IgniteEx)n).context().clientListener().port())
.collect(Collectors.joining(","))
: "127.0.0.1:10800";
private static Map<Long, Connection> jdbcThinConn = new ConcurrentHashMap<>();

return DriverManager.getConnection("jdbc:ignite:thin://" + addrs + "?"
+ PROP_PREFIX + "partitionAwareness=" + partitionAwareness + "&"
+ PROP_PREFIX + "transactionConcurrency=" + txConcurrency);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
});
/** */
private static Map<Long, Connection> jdbcConn = new ConcurrentHashMap<>();

/** */
private final TransactionIsolation txIsolation = READ_COMMITTED;
Expand All @@ -238,21 +230,37 @@ public static Collection<?> parameters() {
for (boolean commit : new boolean[]{false, true}) {
for (boolean mutli : new boolean[] {false, true}) {
for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) {
ExecutorType[] nodeExecTypes = {ExecutorType.SERVER, ExecutorType.CLIENT};
ExecutorType[] nodeExecTypes = {/*ExecutorType.SERVER, ExecutorType.CLIENT,*/};

ExecutorType[] thinExecTypes;

if (modify == SQL) {
params.add(new Object[]{
modify,
ExecutorType.JDBC,
false, //partitionAwareness
cacheMode,
gridCnt,
backup,
commit,
mutli,
txConcurrency
});

thinExecTypes = new ExecutorType[]{
/*
ExecutorType.THIN_VIA_CACHE_API,
ExecutorType.THIN_VIA_QUERY,
ExecutorType.THIN_JDBC
ExecutorType.THIN_JDBC,
*/
};
}
else {
thinExecTypes = new ExecutorType[]{
/*
ExecutorType.THIN_VIA_CACHE_API,
ExecutorType.THIN_VIA_QUERY
*/
};
}

Expand Down Expand Up @@ -385,18 +393,37 @@ private void init() throws Exception {
}
}

/** */
private void clear() {
Consumer<Connection> cclose = c -> {
try {
c.close();
}
catch (SQLException e) {
throw new RuntimeException(e);
}
};

jdbcThinConn.values().forEach(cclose);
jdbcConn.values().forEach(cclose);
jdbcThinConn.clear();
jdbcConn.clear();

stopAllGrids();
}

/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();

stopAllGrids();
clear();
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

boolean reinit = gridCnt != Ignition.allGrids().size() - 1
boolean reinit = gridCnt() != Ignition.allGrids().size()
|| partitionAwareness != thinCliCfg.isPartitionAwarenessEnabled();

if (!reinit) {
Expand All @@ -407,7 +434,7 @@ private void init() throws Exception {
}

if (reinit) {
stopAllGrids();
clear();

init();
}
Expand All @@ -418,7 +445,7 @@ private void init() throws Exception {

insert(F.t(1, JOHN));

assertEquals(gridCnt + 1, Ignition.allGrids().size());
assertEquals(gridCnt(), Ignition.allGrids().size());
assertEquals(mode, cli.cache(tbl()).getConfiguration(CacheConfiguration.class).getCacheMode());
assertEquals(partitionAwareness, thinCliCfg.isPartitionAwarenessEnabled());
cli.cacheNames().stream()
Expand Down Expand Up @@ -502,7 +529,7 @@ public void testIndexScan() {
RunnableX check = () -> {
assertUsersSize(stepCnt * outOfTxSz);

if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertNull(select(id, CACHE));

assertNull(select(id, SQL));
Expand Down Expand Up @@ -715,7 +742,7 @@ public void testInsert() {

Runnable checkBefore = () -> {
for (int i = 4; i <= (multi ? 6 : 4); i++) {
if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertNull(CACHE.name(), select(i, CACHE));

assertNull(SQL.name(), select(i, SQL));
Expand All @@ -724,7 +751,7 @@ public void testInsert() {

Runnable checkAfter = () -> {
for (int i = 4; i <= (multi ? 6 : 4); i++) {
if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertEquals(CACHE.name(), JOHN, select(i, CACHE));

assertEquals(SQL.name(), JOHN, select(i, SQL));
Expand Down Expand Up @@ -769,7 +796,7 @@ public void testUpdate() {

Runnable checkBefore = () -> {
for (int i = 1; i <= (multi ? 3 : 1); i++) {
if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertEquals(JOHN, select(i, CACHE));

assertEquals(JOHN, select(i, SQL));
Expand All @@ -778,7 +805,7 @@ public void testUpdate() {

Runnable checkAfter = () -> {
for (int i = 1; i <= (multi ? 3 : 1); i++) {
if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertEquals(KYLE, select(i, CACHE));

assertEquals(KYLE, select(i, SQL));
Expand All @@ -796,7 +823,7 @@ public void testUpdate() {
update(F.t(1, SARAH));

for (int i = 1; i <= (multi ? 3 : 1); i++) {
if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertEquals(SARAH, select(i, CACHE));
assertEquals(SARAH, select(i, SQL));
}
Expand Down Expand Up @@ -825,7 +852,7 @@ public void testDelete() {

Runnable checkBefore = () -> {
for (int i = 1; i <= (multi ? 3 : 1); i++) {
if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertEquals(JOHN, select(i, CACHE));

assertEquals(JOHN, select(i, SQL));
Expand All @@ -834,7 +861,7 @@ public void testDelete() {

Runnable checkAfter = () -> {
for (int i = 1; i <= (multi ? 3 : 1); i++) {
if (type != ExecutorType.THIN_JDBC)
if (!isJdbc())
assertNull(select(i, CACHE));

assertNull(select(i, SQL));
Expand Down Expand Up @@ -864,7 +891,7 @@ public void testDelete() {
/** */
@Test
public void testVisibility() {
assumeFalse(type == ExecutorType.THIN_JDBC);
assumeFalse(isJdbc());

sql(format("DELETE FROM %s", tbl()));

Expand Down Expand Up @@ -913,9 +940,11 @@ private void insideTx(RunnableX test, boolean commit) {
tx.rollback();
}
}
else if (type == ExecutorType.THIN_JDBC) {
else if (type == ExecutorType.THIN_JDBC || type == ExecutorType.JDBC) {
Connection conn = jdbc();

try {
jdbcThinConn.get().setAutoCommit(false);
conn.setAutoCommit(false);
}
catch (SQLException e) {
throw new RuntimeException(e);
Expand All @@ -927,11 +956,11 @@ else if (type == ExecutorType.THIN_JDBC) {
finally {
try {
if (commit)
jdbcThinConn.get().commit();
conn.commit();
else
jdbcThinConn.get().rollback();
conn.rollback();

jdbcThinConn.get().setAutoCommit(true);
conn.setAutoCommit(true);
}
catch (SQLException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -1280,11 +1309,13 @@ public List<List<?>> sql(String sqlText, int[] parts, Object... args) {
return unwrapBinary(thinCli.query(qry).getAll());
else if (type == ExecutorType.THIN_VIA_CACHE_API)
return unwrapBinary(thinCli.cache(F.first(thinCli.cacheNames())).query(qry).getAll());
else if (type == ExecutorType.THIN_JDBC) {
else if (isJdbc()) {
assertTrue("Partition filter not supported", F.isEmpty(parts));

try {
PreparedStatement stmt = jdbcThinConn.get().prepareStatement(sqlText);
Connection conn = jdbc();

PreparedStatement stmt = conn.prepareStatement(sqlText);

if (!F.isEmpty(args)) {
for (int i = 0; i < args.length; i++)
Expand Down Expand Up @@ -1333,6 +1364,49 @@ private IgniteEx node() {
return type == ExecutorType.CLIENT ? cli : srv;
}

/** */
private Connection jdbc() {
assertTrue(isJdbc());

return type == ExecutorType.THIN_JDBC
? jdbcThinConn.computeIfAbsent(Thread.currentThread().getId(), k -> newThinJdbcConnection())
: jdbcConn.computeIfAbsent(Thread.currentThread().getId(), k -> newConnection());
}

/** */
private int gridCnt() {
return gridCnt + 1 + (type == ExecutorType.JDBC ? 1 : 0);
}

/** */
private Connection newConnection() {
try {
return DriverManager.getConnection("jdbc:ignite:cfg://jdbc-config.xml");
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}

/** */
private Connection newThinJdbcConnection() {
try {
String addrs = partitionAwareness
? Ignition.allGrids().stream()
.filter(n -> !n.configuration().isClientMode())
.map(n -> "127.0.0.1:" + ((IgniteEx)n).context().clientListener().port())
.collect(Collectors.joining(","))
: "127.0.0.1:10800";

return DriverManager.getConnection("jdbc:ignite:thin://" + addrs + "?"
+ PROP_PREFIX + "partitionAwareness=" + partitionAwareness + "&"
+ PROP_PREFIX + "transactionConcurrency=" + txConcurrency);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}

/** */
private int userPartition(int id) {
return affinity(cli.cache(users())).partition(id);
Expand Down Expand Up @@ -1366,6 +1440,11 @@ private void ensureModeSupported() {
);
}

/** */
private boolean isJdbc() {
return type == ExecutorType.THIN_JDBC || type == ExecutorType.JDBC;
}

/** */
private List<List<?>> unwrapBinary(List<List<?>> all) {
return all.stream()
Expand Down
Loading

0 comments on commit 07efd0e

Please sign in to comment.