diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java index 974113a04016a..6b5a2661939e5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java @@ -114,6 +114,8 @@ import org.apache.ignite.internal.util.typedef.F; import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED; +import static org.apache.calcite.sql.SqlKind.IS_DISTINCT_FROM; +import static org.apache.calcite.sql.SqlKind.IS_NOT_DISTINCT_FROM; import static org.apache.ignite.internal.processors.query.calcite.util.TypeUtils.combinedRowType; /** @@ -298,7 +300,8 @@ public LogicalRelImplementor( Comparator comp = expressionFactory.comparator( rel.leftCollation().getFieldCollations().subList(0, pairsCnt), - rel.rightCollation().getFieldCollations().subList(0, pairsCnt) + rel.rightCollation().getFieldCollations().subList(0, pairsCnt), + rel.getCondition().getKind() == IS_NOT_DISTINCT_FROM || rel.getCondition().getKind() == IS_DISTINCT_FROM ); Node node = MergeJoinNode.create(ctx, outType, leftType, rightType, joinType, comp, hasExchange(rel)); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java index a68078f6c421a..3fb64377322ec 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java @@ -60,9 +60,11 @@ Supplier>> accumulatorsFactory( * * @param left Collations of left row. * @param right Collations of right row. + * @param nullsEqual If {@code true}, nulls are considered equal. Usually, NULL <> NULL in SQL. So, the value should + * be {@code false}. Except cases with IS DISTINCT / IS NOT DISTINCT. * @return Rows comparator. */ - Comparator comparator(List left, List right); + Comparator comparator(List left, List right, boolean nullsEqual); /** * Creates a Filter predicate. diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java index d6fb106667b12..2779a2a03b418 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java @@ -193,7 +193,7 @@ else if (c2 != HIGHEST_VALUE) } /** {@inheritDoc} */ - @Override public Comparator comparator(List left, List right) { + @Override public Comparator comparator(List left, List right, boolean nullsEqual) { if (F.isEmpty(left) || F.isEmpty(right) || left.size() != right.size()) throw new IllegalArgumentException("Both inputs should be non-empty and have the same size: left=" + (left != null ? left.size() : "null") + ", right=" + (right != null ? right.size() : "null")); @@ -238,7 +238,8 @@ else if (c2 != HIGHEST_VALUE) } // If compared rows contain NULLs, they shouldn't be treated as equals, since NULL <> NULL in SQL. - return hasNulls ? 1 : 0; + // Except cases with IS DISTINCT / IS NOT DISTINCT. + return hasNulls && !nullsEqual ? 1 : 0; } }; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index c0e2bb2267c02..b1d78912d49e9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -127,7 +127,7 @@ public CorrelatedNestedLoopJoinNode(ExecutionContext ctx, RelDataType rowTy /** {@inheritDoc} */ @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 2; - assert rowsCnt > 0 && requested == 0; + assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested; checkState(); @@ -430,14 +430,21 @@ private void join() throws Exception { int notMatchedIdx = leftMatched.nextClearBit(0); - while (requested > 0 && notMatchedIdx < leftInBuf.size()) { - downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow)); + state = State.IN_LOOP; - requested--; + try { + while (requested > 0 && notMatchedIdx < leftInBuf.size()) { + requested--; + + downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow)); - leftMatched.set(notMatchedIdx); + leftMatched.set(notMatchedIdx); - notMatchedIdx = leftMatched.nextClearBit(notMatchedIdx + 1); + notMatchedIdx = leftMatched.nextClearBit(notMatchedIdx + 1); + } + } + finally { + state = State.IDLE; } if (requested == 0 && notMatchedIdx < leftInBuf.size()) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteTypeCoercion.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteTypeCoercion.java index 0528f37a09006..8cba041988535 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteTypeCoercion.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteTypeCoercion.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.nio.charset.Charset; +import java.util.Arrays; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.rel.type.DynamicRecordType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.SqlCollation; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; @@ -33,6 +35,7 @@ import org.apache.calcite.sql.SqlUserDefinedTypeNameSpec; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorScope; @@ -55,6 +58,41 @@ public IgniteTypeCoercion(RelDataTypeFactory typeFactory, SqlValidator validator super(typeFactory, validator); } + /** {@inheritDoc} **/ + @Override public boolean binaryComparisonCoercion(SqlCallBinding binding) { + // Although it is not reflected in the docs, this method is also invoked for MAX, MIN (and other similar operators) + // by ComparableOperandTypeChecker. When that is the case, fallback to default rules. + SqlCall call = binding.getCall(); + + if (binding.getOperandCount() != 2 || !SqlKind.BINARY_COMPARISON.contains(call.getKind())) + return super.binaryComparisonCoercion(binding); + + SqlValidatorScope scope = binding.getScope(); + + RelDataType leftType = validator.deriveType(scope, call.operand(0)); + RelDataType rightType = validator.deriveType(scope, call.operand(1)); + + if (leftType.equals(rightType)) + return super.binaryComparisonCoercion(binding); + else { + // Find the least restrictive type among the operand types + // and coerce the operands to that type if such type exists. + // + // An example of a least restrictive type from the javadoc for RelDataTypeFactory::leastRestrictive: + // leastRestrictive(INT, NUMERIC(3, 2)) could be NUMERIC(12, 2) + // + // A least restrictive type between types of different type families does not exist - + // the method returns null (See SqlTypeFactoryImpl::leastRestrictive). + // + RelDataType targetType = factory.leastRestrictive(Arrays.asList(leftType, rightType)); + + if (targetType == null || targetType.getFamily() == SqlTypeFamily.ANY) + return super.binaryComparisonCoercion(binding); + else + return coerceOperandsType(scope, call, targetType); + } + } + /** {@inheritDoc} */ @Override protected boolean coerceOperandType( SqlValidatorScope scope, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java index c6782cab398e6..9e4269a5d97bb 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceIntersect; import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; /** * Set op (MINUS, INTERSECT) converter rule. @@ -77,6 +78,8 @@ private abstract static class ColocatedSetOpConverterRule exten RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single()); List inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait)); + inputs = Commons.castToLeastRestrictiveIfRequired(inputs, cluster, inTrait); + return createNode(cluster, outTrait, inputs, setOp.all); } } @@ -131,6 +134,8 @@ abstract PhysicalNode createReduceNode(RelOptCluster cluster, RelTraitSet traits RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE); List inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait)); + inputs = Commons.castToLeastRestrictiveIfRequired(inputs, cluster, inTrait); + RelNode map = createMapNode(cluster, outTrait, inputs, setOp.all); return createReduceNode( diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java index df7944cb006a3..6b106a49f15f5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/UnionConverterRule.java @@ -47,12 +47,14 @@ public UnionConverterRule(Config cfg) { /** {@inheritDoc} */ @Override public void onMatch(RelOptRuleCall call) { - final LogicalUnion union = call.rel(0); + LogicalUnion union = call.rel(0); RelOptCluster cluster = union.getCluster(); RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE); List inputs = Commons.transform(union.getInputs(), input -> convert(input, traits)); + inputs = Commons.castToLeastRestrictiveIfRequired(inputs, cluster, traits); + RelNode res = new IgniteUnionAll(cluster, traits, inputs); if (!union.all) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java index 00de072af2120..954b9e64ddf07 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/IgniteTypeFactory.java @@ -27,6 +27,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.Period; +import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,12 +43,23 @@ import org.apache.calcite.sql.type.BasicSqlType; import org.apache.calcite.sql.type.IntervalSqlType; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.typedef.F; /** * Ignite type factory. */ public class IgniteTypeFactory extends JavaTypeFactoryImpl { + /** */ + private static final EnumMap UNSUPPORTED_TYPES = new EnumMap<>(SqlTypeName.class); + + static { + UNSUPPORTED_TYPES.put(SqlTypeName.TIME_TZ, "TIME WITH TIME ZONE"); + UNSUPPORTED_TYPES.put(SqlTypeName.TIMESTAMP_TZ, "TIMESTAMP WITH TIME ZONE"); + UNSUPPORTED_TYPES.put(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "TIMESTAMP WITH LOCAL TIME ZONE"); + UNSUPPORTED_TYPES.put(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, "TIME WITH LOCAL TIME ZONE"); + } + /** Interval qualifier to create year-month interval types. */ private static final SqlIntervalQualifier INTERVAL_QUALIFIER_YEAR_MONTH = new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO); @@ -133,9 +145,8 @@ else if (type instanceof BasicSqlType || type instanceof IntervalSqlType) { return Enum.class; case ANY: case OTHER: - return Object.class; case NULL: - return Void.class; + return Object.class; default: break; } @@ -345,6 +356,35 @@ private boolean allEquals(List types) { return true; } + /** {@inheritDoc} */ + @Override public RelDataType createSqlType(SqlTypeName typeName) { + checkUnsupportedType(typeName); + + return super.createSqlType(typeName); + } + + /** {@inheritDoc} */ + @Override public RelDataType createSqlType(SqlTypeName typeName, int precision) { + checkUnsupportedType(typeName); + + return super.createSqlType(typeName, precision); + } + + /** {@inheritDoc} */ + @Override public RelDataType createSqlType(SqlTypeName typeName, int precision, int scale) { + checkUnsupportedType(typeName); + + return super.createSqlType(typeName, precision, scale); + } + + /** */ + private static void checkUnsupportedType(SqlTypeName typeName) { + String unsupportedTypeName = UNSUPPORTED_TYPES.get(typeName); + + if (unsupportedTypeName != null) + throw new IgniteException("Type '" + unsupportedTypeName + "' is not supported."); + } + /** {@inheritDoc} */ @Override public RelDataType createUnknownType() { // TODO workaround for https://issues.apache.org/jira/browse/CALCITE-5297 diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index e1c8f8dbfaba4..c0fc06977160a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -38,10 +38,16 @@ import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.SourceStringReader; @@ -64,6 +70,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl; import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext; import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; @@ -143,6 +150,61 @@ public static List intersect(Set set, List list) { .collect(Collectors.toList()); } + /** + * Finds the least restrictive type of the inputs and adds a cast projection if required. + * + * @param inputs Inputs to try to cast. + * @param cluster Cluster. + * @param traits Traits. + * @return Converted inputs. + */ + public static List castToLeastRestrictiveIfRequired(List inputs, RelOptCluster cluster, RelTraitSet traits) { + List inputRowTypes = inputs.stream().map(RelNode::getRowType).collect(Collectors.toList()); + + // Output type of a set operator is equal to leastRestrictive(inputTypes) (see SetOp::deriveRowType). + RelDataTypeFactory typeFactory = cluster.getTypeFactory(); + + RelDataType leastRestrictive = typeFactory.leastRestrictive(inputRowTypes); + + if (leastRestrictive == null) + throw new IllegalStateException("Cannot find least restrictive type for arguments to set op: " + inputRowTypes); + + // If input's type does not match the result type, then add a cast projection for non-matching fields. + RexBuilder rexBuilder = cluster.getRexBuilder(); + List newInputs = new ArrayList<>(inputs.size()); + + for (RelNode input : inputs) { + RelDataType inputRowType = input.getRowType(); + + // It is always safe to convert from [T1 nullable, T2 not nullable] to [T1 nullable, T2 nullable] and + // the least restrictive type does exactly that. + if (SqlTypeUtil.equalAsStructSansNullability(typeFactory, leastRestrictive, inputRowType, null)) { + newInputs.add(input); + + continue; + } + + List expressions = new ArrayList<>(inputRowType.getFieldCount()); + + for (int i = 0; i < leastRestrictive.getFieldCount(); i++) { + RelDataType fieldType = inputRowType.getFieldList().get(i).getType(); + + RelDataType outFieldType = leastRestrictive.getFieldList().get(i).getType(); + + RexNode ref = rexBuilder.makeInputRef(input, i); + + if (fieldType.equals(outFieldType)) + expressions.add(ref); + else + expressions.add(rexBuilder.makeCast(outFieldType, ref, true, false)); + } + + newInputs.add(new IgniteProject(cluster, traits, input, expressions, leastRestrictive)); + } + + return newInputs; + } + /** * Returns a given list as a typed list. */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java index 811f6c692d138..7fda6a03b6f48 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java @@ -145,6 +145,11 @@ public static boolean needCast(RelDataTypeFactory factory, RelDataType fromType, return false; } + // Currently, RelDataTypeFactoryImpl#CLASS_FAMILIES doesn't consider the byte type as an integer. + if ((fromType.getSqlTypeName() == SqlTypeName.TINYINT && SqlTypeUtil.isIntType(toType)) + || (toType.getSqlTypeName() == SqlTypeName.TINYINT && SqlTypeUtil.isIntType(fromType))) + return false; + // Implicit type coercion does not handle nullability. if (SqlTypeUtil.equalSansNullability(factory, fromType, toType)) return false; diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java index e201c5cf2f425..6ca45427497c8 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java @@ -515,8 +515,11 @@ public void testCorrelatedNestedLoopJoin() { join.register(Arrays.asList(left, right)); + FilterNode filter = new FilterNode<>(ctx, joinRowType, r -> true); + filter.register(join); + RootNode root = new RootNode<>(ctx, joinRowType); - root.register(join); + root.register(filter); int cnt = 0; while (root.hasNext()) { diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java index d81d0993c01cd..9746f6c6644d7 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java @@ -145,10 +145,7 @@ protected QueryChecker assertQuery(Ignite ignite, String qry) { }; } - /** - * @deprecated Use {@link #sql(String, Object...)} instead. - */ - @Deprecated + /** */ protected List> executeSql(String sql, Object... args) { return executeSql(client, sql, args); } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java index 58abf9d962961..5d29bb82263f0 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTransactionalTest.java @@ -111,8 +111,8 @@ protected void init() throws Exception { client = startClientGrid("client"); } - /** {@inheritDoc} */ - @Override protected QueryChecker assertQuery(IgniteEx ignite, String qry) { + /** */ + protected QueryChecker assertQuery(IgniteEx ignite, String qry) { return new QueryChecker(qry, tx, sqlTxMode) { @Override protected QueryEngine getEngine() { return Commons.lookupComponent(ignite.context(), QueryEngine.class); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/DataTypesTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/DataTypesTest.java index 4dc3d9cde3e16..02d4a77a28dc7 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/DataTypesTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/DataTypesTest.java @@ -18,17 +18,21 @@ package org.apache.ignite.internal.processors.query.calcite.integration; import java.math.BigDecimal; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; import com.google.common.collect.ImmutableSet; import org.apache.calcite.runtime.CalciteException; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition; import org.apache.ignite.internal.util.typedef.F; import org.junit.Test; @@ -323,6 +327,28 @@ public void testBinarySql() { assertTrue(Objects.deepEquals(new byte[]{(byte)0xAA, (byte)0xBB, (byte)0xCC}, res.get(0).get(1))); } + /** */ + @Test + public void testUnsupportedTypes() { + assertThrows("CREATE TABLE test (val TIME WITH TIME ZONE)", IgniteException.class, + "'TIME WITH TIME ZONE' is not supported."); + assertThrows("CREATE TABLE test (val TIMESTAMP WITH TIME ZONE)", IgniteException.class, + "'TIMESTAMP WITH TIME ZONE' is not supported."); + assertThrows("CREATE TABLE test (val TIME WITH LOCAL TIME ZONE)", IgniteException.class, + "'TIME WITH LOCAL TIME ZONE' is not supported."); + assertThrows("CREATE TABLE test (val TIMESTAMP WITH LOCAL TIME ZONE)", IgniteException.class, + "'TIMESTAMP WITH LOCAL TIME ZONE' is not supported."); + + assertThrows("SELECT CAST (1 as TIME WITH TIME ZONE)", IgniteException.class, + "'TIME WITH TIME ZONE' is not supported."); + assertThrows("SELECT CAST (1 as TIMESTAMP WITH TIME ZONE)", IgniteException.class, + "'TIMESTAMP WITH TIME ZONE' is not supported."); + assertThrows("SELECT CAST (1 as TIME WITH LOCAL TIME ZONE)", IgniteException.class, + "'TIME WITH LOCAL TIME ZONE' is not supported."); + assertThrows("SELECT CAST (1 as TIMESTAMP WITH LOCAL TIME ZONE)", IgniteException.class, + "'TIMESTAMP WITH LOCAL TIME ZONE' is not supported."); + } + /** Cache API - SQL API cross check. */ @Test public void testBinaryCache() { @@ -421,6 +447,105 @@ public void testDecimalScale() { .check(); } + /** */ + @Test + public void testIsNotDistinctFromTypeConversion() { + SqlTypeName[] numerics = new SqlTypeName[] {SqlTypeName.TINYINT, SqlTypeName.SMALLINT, SqlTypeName.INTEGER, + SqlTypeName.BIGINT, SqlTypeName.DECIMAL, SqlTypeName.FLOAT, SqlTypeName.DOUBLE}; + + sql("CREATE TABLE t1(key1 INTEGER, i1idx INTEGER, i1 INTEGER, chr1 VARCHAR, PRIMARY KEY(key1))"); + sql("CREATE INDEX t1_idx ON t1(i1idx)"); + sql("INSERT INTO t1 VALUES (1, 1, null, '1'), (2, 2, 2, '22'), (3, 33, 3, null), (4, null, 4, '4')"); + + for (SqlTypeName type : numerics) { + String t = type.getName(); + + sql("CREATE TABLE t2(key2 " + t + ", i2idx " + t + ", i2 " + t + ", i3 INTEGER, chr2 VARCHAR, PRIMARY KEY(key2))"); + sql("CREATE INDEX t2_idx ON t2(i2idx)"); + sql("INSERT INTO t2 VALUES (0, 0, 0, null, '0'), (11, null, 1, 1, '1'), (2, 2, 2, 2, '22'), (3, 3, null, 3, null)"); + + for (HintDefinition hint : Arrays.asList(HintDefinition.MERGE_JOIN, HintDefinition.NL_JOIN, HintDefinition.CNL_JOIN)) { + String h = "/*+ " + hint.name() + " */ "; + + // Primary keys, indexed. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON key2 IS NOT DISTINCT FROM key1") + .returns(2, 2) + .returns(3, 3) + .check(); + + // Indexed and not indexed. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2idx IS NOT DISTINCT FROM i1") + .returns(1, 1) + .returns(2, 2) + .returns(3, 3) + .check(); + + // Both not indexed. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2 IS NOT DISTINCT FROM i1") + .returns(1, 3) + .returns(2, 2) + .check(); + + // Indexed and casted. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2idx IS NOT DISTINCT FROM CAST(chr1 as INTEGER)") + .returns(3, 1) + .check(); + + // Not indexed and casted. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2 IS NOT DISTINCT FROM CAST(chr1 as INTEGER)") + .returns(1, 1) + .returns(3, 3) + .check(); + + // @see MergeJoinConverterRule#matchesJoin(RelOptRuleCall) + if (hint == HintDefinition.MERGE_JOIN) + continue; + + // Primary keys, indexed. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON key2 IS DISTINCT FROM key1 and key1<2") + .returns(1, 1) + .returns(1, 2) + .returns(1, 3) + .returns(1, null) + .check(); + + // Indexed and not indexed. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2idx IS NOT DISTINCT FROM i1") + .returns(1, 1) + .returns(2, 2) + .returns(3, 3) + .check(); + + // Both not indexed. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2 IS DISTINCT FROM i1 and key1<3") + .returns(1, 1) + .returns(1, 2) + .returns(1, null) + .returns(2, 1) + .returns(2, 3) + .returns(2, null) + .check(); + + // Indexed and casted. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2idx IS DISTINCT FROM CAST(chr1 as INTEGER) and key1<2") + .returns(1, null) + .returns(1, 2) + .returns(1, 3) + .returns(1, 1) + .check(); + + // Not indexed and casted. + assertQuery("SELECT " + h + "key1, i3 FROM t1 JOIN t2 ON i2 IS DISTINCT FROM CAST(chr1 as INTEGER) and key1<2") + .returns(1, null) + .returns(1, 2) + .returns(1, 3) + .check(); + } + + sql("DROP TABLE t2"); + } + } + /** */ @Test public void testNumericConversion() { diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java index 584c967f03990..656e30333fb8c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.calcite.integration; +import java.util.Arrays; import java.util.List; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; @@ -516,4 +517,72 @@ public void testSetOpRewindability() { .returns(2) .check(); } + + /** */ + @Test + public void testNumbersCastInUnion() throws Exception { + doTestNumbersCastInSetOp("UNION", 10, 20, 30, 33, 40, 44, 50, null); + + doTestNumbersCastInSetOp("UNION ALL", 10, 20, 20, 30, 30, 33, 40, 44, 50, 50, 50, 50, null, null); + } + + /** */ + @Test + public void testNumbersCastInIntersect() throws Exception { + doTestNumbersCastInSetOp("INTERSECT", 20, 50, null); + + doTestNumbersCastInSetOp("INTERSECT ALL", 20, 50, 50, null); + } + + /** */ + @Test + public void testNumbersCastInExcept() throws Exception { + doTestNumbersCastInSetOp("EXCEPT", 30, 40); + + doTestNumbersCastInSetOp("EXCEPT ALL", 30, 30, 40); + } + + /** + * Tests 'SELECT TBL1.val SetOp TBL2.val' where TBL1 has `INT val` and TBL2 has 'val' of different numeric type. + * TBL1: 30, 20, 30, 40, 50, 50, null + * TBL2: 10, 20, 33, 44, 50, 50, null + * + * @param op Operation like 'UNION' or 'INTERSECT' + * @param expected Expected result as integers. + */ + private void doTestNumbersCastInSetOp(String op, Integer... expected) throws InterruptedException { + List types = F.asList("TINYINT", "SMALLINT", "INTEGER", "REAL", "FLOAT", "BIGINT", "DOUBLE", "DECIMAL"); + + sql(client, "CREATE TABLE t0(id INT PRIMARY KEY, val INTEGER) WITH \"affinity_key=id\""); + + try { + sql(client, "INSERT INTO t0 VALUES (1, 30), (2, 20), (3, 30), (4, 40), (5, 50), (6, 50), (7, null)"); + + for (String tblOpts : Arrays.asList("", " WITH \"template=replicated\"", " WITH \"affinity_key=aff\"")) { + for (String t2 : types) { + sql(client, "CREATE TABLE t1(id INT, aff INT, val " + t2 + ", PRIMARY KEY(id, aff))" + tblOpts); + + sql(client, "INSERT INTO t1 VALUES (1, 1, 10), (2, 1, 20), (3, 1, 33), (4, 2, 44), (5, 2, 50), " + + "(6, 3, 50), (7, 3, null)"); + + List> res = sql(client, "SELECT val from t0 " + op + " select val from t1 ORDER BY 1 NULLS LAST"); + + sql(client, "DROP TABLE t1"); + + assertEquals(expected.length, res.size()); + + for (int i = 0; i < expected.length; ++i) { + assertEquals(1, res.get(i).size()); + + assertEquals(expected[i], res.get(i).get(0) == null ? null : ((Number)res.get(i).get(0)).intValue()); + } + } + } + } + finally { + sql(client, "DROP TABLE t0"); + + awaitPartitionMapExchange(); + } + } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/DataTypesPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/DataTypesPlannerTest.java new file mode 100644 index 0000000000000..c2edafb8bf397 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/DataTypesPlannerTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Predicate; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.junit.Test; + +/** + * Planner test various types, casts and coercions. + */ +public class DataTypesPlannerTest extends AbstractPlannerTest { + /** Tests casts of numeric types in SetOps (UNION, EXCEPT, INTERSECT, etc.). */ + @Test + public void testSetOpNumbersCast() throws Exception { + List distrs = Arrays.asList(IgniteDistributions.single(), IgniteDistributions.random(), + IgniteDistributions.affinity(0, 1001, 0)); + + for (IgniteDistribution d1 : distrs) { + for (IgniteDistribution d2 : distrs) { + doTestSetOpNumbersCast(d1, d2, true, true); + + doTestSetOpNumbersCast(d1, d2, false, true); + + doTestSetOpNumbersCast(d1, d2, false, false); + } + } + } + + /** */ + private void doTestSetOpNumbersCast( + IgniteDistribution distr1, + IgniteDistribution distr2, + boolean nullable1, + boolean nullable2 + ) throws Exception { + IgniteSchema schema = new IgniteSchema(DEFAULT_SCHEMA); + + IgniteTypeFactory f = Commons.typeFactory(); + + SqlTypeName[] numTypes = new SqlTypeName[] {SqlTypeName.TINYINT, SqlTypeName.SMALLINT, SqlTypeName.REAL, + SqlTypeName.FLOAT, SqlTypeName.INTEGER, SqlTypeName.BIGINT, SqlTypeName.DOUBLE, SqlTypeName.DECIMAL}; + + boolean notNull = !nullable1 && !nullable2; + + for (SqlTypeName t1 : numTypes) { + for (SqlTypeName t2 : numTypes) { + RelDataType type = new RelDataTypeFactory.Builder(f) + .add("C1", f.createTypeWithNullability(f.createSqlType(t1), nullable1)) + .add("C2", f.createTypeWithNullability(f.createSqlType(SqlTypeName.VARCHAR), true)) + .build(); + + createTable(schema, "TABLE1", type, distr1, null); + + type = new RelDataTypeFactory.Builder(f) + .add("C1", f.createTypeWithNullability(f.createSqlType(t2), nullable2)) + .add("C2", f.createTypeWithNullability(f.createSqlType(SqlTypeName.VARCHAR), true)) + .build(); + + createTable(schema, "TABLE2", type, distr2, null); + + for (String op : Arrays.asList("UNION", "INTERSECT", "EXCEPT")) { + String sql = "SELECT * FROM table1 " + op + " SELECT * FROM table2"; + + if (t1 == t2 && (!nullable1 || !nullable2)) + assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteProject.class)).negate()); + else { + RelDataType targetT = f.leastRestrictive(Arrays.asList(f.createSqlType(t1), f.createSqlType(t2))); + + assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(SetOp.class) + .and(t1 == targetT.getSqlTypeName() ? input(0, nodeOrAnyChild(isInstanceOf(IgniteProject.class)).negate()) + : input(0, projectFromTable("TABLE1", "CAST($0):" + targetT + (notNull ? " NOT NULL" : ""), "$1"))) + .and(t2 == targetT.getSqlTypeName() ? input(1, nodeOrAnyChild(isInstanceOf(IgniteProject.class)).negate()) + : input(1, projectFromTable("TABLE2", "CAST($0):" + targetT + (notNull ? " NOT NULL" : ""), "$1"))) + )); + } + } + } + } + } + + /** */ + protected Predicate projectFromTable(String tableName, String... exprs) { + return nodeOrAnyChild( + isInstanceOf(IgniteProject.class) + .and(projection -> { + String actualProj = projection.getProjects().toString(); + + String expectedProj = Arrays.asList(exprs).toString(); + + return actualProj.equals(expectedProj); + }) + .and(input(nodeOrAnyChild(isTableScan(tableName)))) + ); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java index 372583f89f316..53ff68e605e69 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedSubqueryPlannerTest; +import org.apache.ignite.internal.processors.query.calcite.planner.DataTypesPlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.IndexRebuildPlannerTest; @@ -67,6 +68,7 @@ TableFunctionPlannerTest.class, TableDmlPlannerTest.class, UnionPlannerTest.class, + DataTypesPlannerTest.class, JoinCommutePlannerTest.class, LimitOffsetPlannerTest.class, MergeJoinPlannerTest.class, diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 701caa3f237c0..b6dfcddb9f736 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -188,12 +188,12 @@ public interface IgniteCache extends javax.cache.Cache, IgniteAsyncS *

* Full list of repairable methods: *

    - *
  • {@link IgniteCache#containsKey} && {@link IgniteCache#containsKeyAsync}
  • - *
  • {@link IgniteCache#containsKeys} && {@link IgniteCache#containsKeysAsync}
  • - *
  • {@link IgniteCache#getEntry} && {@link IgniteCache#getEntryAsync}
  • - *
  • {@link IgniteCache#getEntries} && {@link IgniteCache#getEntriesAsync}
  • - *
  • {@link IgniteCache#get} && {@link IgniteCache#getAsync}
  • - *
  • {@link IgniteCache#getAll} && {@link IgniteCache#getAllAsync}
  • + *
  • {@link IgniteCache#containsKey} && {@link IgniteCache#containsKeyAsync}
  • + *
  • {@link IgniteCache#containsKeys} && {@link IgniteCache#containsKeysAsync}
  • + *
  • {@link IgniteCache#getEntry} && {@link IgniteCache#getEntryAsync}
  • + *
  • {@link IgniteCache#getEntries} && {@link IgniteCache#getEntriesAsync}
  • + *
  • {@link IgniteCache#get} && {@link IgniteCache#getAsync}
  • + *
  • {@link IgniteCache#getAll} && {@link IgniteCache#getAllAsync}
  • *
* @param strategy Read Repair strategy. * @return Cache with explicit consistency check on each read and repair if necessary. @@ -226,7 +226,7 @@ public interface IgniteCache extends javax.cache.Cache, IgniteAsyncS * (which will be stored in binary format), you should acquire following projection * to avoid deserialization: *
-     * IgniteCache prj = cache.withKeepBinary();
+     * IgniteCache<Integer, BinaryObject> prj = cache.withKeepBinary();
      *
      * // Value is not deserialized and returned in binary format.
      * BinaryObject po = prj.get(1);
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index eb1dd7bd74b7b..6e320ab516a49 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -126,7 +126,7 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
      * Returned result is collection of tuples. Each tuple corresponds to one node start attempt and
      * contains hostname, success flag and error message if attempt was not successful. Note that
      * successful attempt doesn't mean that node was actually started and joined topology. For large
-     * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual
+     * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual
      * node logs for details.
      * 

* Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -157,7 +157,7 @@ public Collection startNodes(File file, boolean restart, * Completed future contains collection of tuples. Each tuple corresponds to one node start attempt and * contains hostname, success flag and error message if attempt was not successful. Note that * successful attempt doesn't mean that node was actually started and joined topology. For large - * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual + * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual * node logs for details. * * @param file Configuration file. @@ -254,7 +254,7 @@ public IgniteFuture> startNodesAsync(File fil * Returned result is collection of tuples. Each tuple corresponds to one node start attempt and * contains hostname, success flag and error message if attempt was not successful. Note that * successful attempt doesn't mean that node was actually started and joined topology. For large - * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual + * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual * node logs for details. *

* Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -356,7 +356,7 @@ public Collection startNodes(Collection 100s nodes) it can take over 10 minutes for all nodes to start. See individual + * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual * node logs for details. * * @param hosts Startup parameters. diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java index c0aa637e7558c..7323471dd17cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java @@ -101,7 +101,7 @@ public class IgniteJdbcThinDataSource implements DataSource, Serializable { } /** - * Different application servers us different format (URL & url). + * Different application servers us different format (URL & url). * @return Connection URL. */ public String getURL() { @@ -109,7 +109,7 @@ public String getURL() { } /** - * Different application servers us different format (URL & url). + * Different application servers us different format (URL & url). * @param url Connection URL. * @throws SQLException On error whrn URL is invalid. */ diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index d8b89ec87f1a6..b0af9955865f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1698,7 +1698,7 @@ public final class IgniteSystemProperties { /** * Flag to disable memory optimization: * BitSets instead of HashSets to store partitions. - * When number of backups per partion is > IGNITE_AFFINITY_BACKUPS_THRESHOLD we use HashMap to improve contains() + * When number of backups per partion is > IGNITE_AFFINITY_BACKUPS_THRESHOLD we use HashMap to improve contains() * which leads to extra memory consumption, otherwise we use view on the * list of cluster nodes to reduce memory consumption on redundant data structures. */ diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java index cd1a979802e5e..08184abe67079 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheEntry.java @@ -44,11 +44,11 @@ * *

Java Example

*
- * IgniteCache cache = grid(0).cache(null);
+ * IgniteCache<Integer, String> cache = grid(0).cache(null);
  *
- * CacheEntry entry1 = cache.invoke(100,
- *     new EntryProcessor>() {
- *          public CacheEntry process(MutableEntry entry,
+ * CacheEntry<String, Integer> entry1 = cache.invoke(100,
+ *     new EntryProcessor<Integer, String, CacheEntry<String, Integer>>() {
+ *          public CacheEntry<String, Integer> process(MutableEntry<Integer, String> entry,
  *              Object... arguments) throws EntryProcessorException {
  *                  return entry.unwrap(CacheEntry.class);
  *          }
@@ -56,16 +56,16 @@
  *
  * // Cache entry for the given key may be updated at some point later.
  *
- * CacheEntry entry2 = cache.invoke(100,
- *     new EntryProcessor>() {
- *          public CacheEntry process(MutableEntry entry,
+ * CacheEntry<String, Integer> entry2 = cache.invoke(100,
+ *     new EntryProcessor<Integer, String, CacheEntry<String, Integer>>() {
+ *          public CacheEntry<String, Integer> process(MutableEntry<Integer, String> entry,
  *              Object... arguments) throws EntryProcessorException {
  *                  return entry.unwrap(CacheEntry.class);
  *          }
  *     });
  *
  * // Comparing entries' versions.
- * if (entry1.version().compareTo(entry2.version()) < 0) {
+ * if (entry1.version().compareTo(entry2.version()) < 0) {
  *     // the entry has been updated
  * }
  * 
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java index 5850aa319d505..12d3d92e3d565 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java @@ -560,7 +560,7 @@ public Map getAliases() { /** * Sets mapping from full property name in dot notation to an alias that will be used as SQL column name. - * Example: {"parent.name" -> "parentName"}. + * Example: {"parent.name" -> "parentName"}. * * @param aliases Aliases map. * @return {@code this} for chaining. diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index f89836519efd0..ef76a8c2bf940 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -66,12 +66,12 @@ * ContinuousQuery<Long, Person> qry = new ContinuousQuery<>(); * * // Initial iteration query will return all people with salary above 1000. - * qry.setInitialQuery(new ScanQuery<>((id, p) -> p.getSalary() > 1000)); + * qry.setInitialQuery(new ScanQuery<>((id, p) -> p.getSalary() > 1000)); * * * // Callback that is called locally when update notifications are received. * // It simply prints out information about all created or modified records. - * qry.setLocalListener((evts) -> { + * qry.setLocalListener((evts) -> { * for (CacheEntryEvent<? extends Long, ? extends Person> e : evts) { * Person p = e.getValue(); * @@ -80,7 +80,7 @@ * }); * * // The continuous listener will be notified for people with salary above 1000. - * qry.setRemoteFilter(evt -> evt.getValue().getSalary() > 1000); + * qry.setRemoteFilter(evt -> evt.getValue().getSalary() > 1000); * * // Execute the query and get a cursor that iterates through the initial data. * QueryCursor<Cache.Entry<Long, Person>> cur = cache.query(qry); diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index 26c9c725b92cf..0d33456cc070b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -1943,7 +1943,7 @@ protected JdbcTypeField[] valueColumns() { /** * Get full table name. * - * @return <schema>.<table name> + * @return <schema>.<table name> */ protected String fullTableName() { return fullTblName; diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java index 413b8e2c00382..cdccc13450d68 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java +++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java @@ -827,7 +827,7 @@ public IgniteClientFuture>> invokeAllAsync( * (which will be stored in binary format), you should acquire following projection * to avoid deserialization: *
-     * CacheClient prj = cache.withKeepBinary();
+     * CacheClient<Integer, BinaryObject> prj = cache.withKeepBinary();
      *
      * // Value is not deserialized and returned in binary format.
      * BinaryObject po = prj.get(1);
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 5b84d9f1fc4dd..f9f081c6f2461 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -39,13 +39,13 @@
  * {@link IgniteConfiguration#getUserAttributes()} method to initialize your custom
  * node attributes at startup. Here is an example of how to assign an attribute to a node at startup:
  * 
- * <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ * <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  *     ...
- *     <property name="userAttributes">
- *         <map>
- *             <entry key="worker" value="true"/>
- *         </map>
- *     </property>
+ *     <property name="userAttributes">
+ *         <map>
+ *             <entry key="worker" value="true"/>
+ *         </map>
+ *     </property>
  *     ...
  * </bean>
  * 
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuationAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuationAdapter.java index 60478a7ed5cdf..fa502fa1896ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuationAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuationAdapter.java @@ -46,7 +46,7 @@ * protected Collection<? extends ComputeJob> split(int gridSize, final String arg) throws IgniteCheckedException { * List<ComputeJobAdapter<String>> jobs = new ArrayList<ComputeJobAdapter<String>>(gridSize); * - * for (int i = 0; i < gridSize; i++) { + * for (int i = 0; i < gridSize; i++) { * jobs.add(new ComputeJobAdapter() { * // Job execution logic. * public Object execute() throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java index a19af07916c37..6efc4ecbf4ee9 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java @@ -39,10 +39,10 @@ * is transparent to your code and is handled automatically by the adapter. * Here is an example of how your task will look: *
- * public class MyFooBarTask extends ComputeTaskSplitAdapter<String> {
+ * public class MyFooBarTask extends ComputeTaskSplitAdapter<String> {
  *     @Override
- *     protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException {
- *         List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize);
+ *     protected Collection<? extends ComputeJob> split(int gridSize, String arg) throws IgniteCheckedException {
+ *         List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize);
  *
  *         for (int i = 0; i < gridSize; i++) {
  *             jobs.add(new MyFooBarJob(arg));
@@ -61,14 +61,14 @@
  * case we manually inject load balancer and use it to pick the best node. Doing it in
  * such way would allow user to map some jobs manually and for others use load balancer.
  * 
- * public class MyFooBarTask extends ComputeTaskAdapter<String, String> {
+ * public class MyFooBarTask extends ComputeTaskAdapter<String, String> {
  *     // Inject load balancer.
  *     @LoadBalancerResource
  *     ComputeLoadBalancer balancer;
  *
  *     // Map jobs to grid nodes.
- *     public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException {
- *         Map<MyFooBarJob, ClusterNode> jobs = new HashMap<MyFooBarJob, ClusterNode>(subgrid.size());
+ *     public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException {
+ *         Map<MyFooBarJob, ClusterNode> jobs = new HashMap<MyFooBarJob, ClusterNode>(subgrid.size());
  *
  *         // In more complex cases, you can actually do
  *         // more complicated assignments of jobs to nodes.
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskContinuousMapper.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskContinuousMapper.java
index 39bed82cfcfe8..d6a273ae6d3da 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskContinuousMapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskContinuousMapper.java
@@ -48,7 +48,7 @@
  * 
  * ...
  * // This field will be injected with task continuous mapper.
- * @TaskContinuousMapperResource
+ * {@literal @}TaskContinuousMapperResource
  * private ComputeTaskContinuousMapper mapper;
  * ...
  * 
@@ -56,7 +56,7 @@ *
  * // This setter method will be automatically called by the system
  * // to set grid task continuous mapper.
- * @TaskContinuousMapperResource
+ * {@literal @}TaskContinuousMapperResource
  * void setSession(ComputeTaskContinuousMapper mapper) {
  *     this.mapper = mapper;
  * }
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
index 3e017c69b6f22..7d1179f219126 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
@@ -85,7 +85,7 @@
  * 
  * ...
  * // This field will be injected with distributed task session.
- * @TaskSessionResource
+ * {@literal @}TaskSessionResource
  * private ComputeTaskSession ses;
  * ...
  * 
@@ -93,7 +93,7 @@ *
  * // This setter method will be automatically called by the system
  * // to set grid task session.
- * @TaskSessionResource
+ * {@literal @}TaskSessionResource
  * void setSession(ComputeTaskSession ses) {
  *     this.ses = ses;
  * }
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DeploymentMode.java b/modules/core/src/main/java/org/apache/ignite/configuration/DeploymentMode.java
index 78bc718076a97..030ba9be65258 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DeploymentMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DeploymentMode.java
@@ -45,7 +45,7 @@
  *    <!-- User version. -->
  *    <bean id="userVersion" class="java.lang.String">
  *        <constructor-arg value="0"/>
- *    </bean>
+ *    </bean>
  * 
* By default, all ignite startup scripts ({@code ignite.sh} or {@code ignite.bat}) * pick up user version from {@code IGNITE_HOME/config/userversion} folder. Usually, it diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/SqlConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/SqlConfiguration.java index 382f3b8b524f1..4dd80e3e09ebf 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/SqlConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/SqlConfiguration.java @@ -158,18 +158,18 @@ public SqlConfiguration setLongQueryWarningTimeout(long longQryWarnTimeout) { } /** - * Is key & value validation enabled. + * Is key & value validation enabled. * - * @return {@code true} When key & value shall be validated against SQL schema. + * @return {@code true} When key & value shall be validated against SQL schema. */ public boolean isValidationEnabled() { return validationEnabled; } /** - * Enable/disable key & value validation. + * Enable/disable key & value validation. * - * @param validationEnabled {@code true} When key & value shall be validated against SQL schema. + * @param validationEnabled {@code true} When key & value shall be validated against SQL schema. * Default value is {@code false}. * @return {@code this} for chaining. */ diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java index 1e04ce6837981..f712a02901132 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java @@ -85,7 +85,7 @@ * * // Callback that is called locally when update notifications are received. * // It simply prints out information about all created persons. - * qry.setLocalListener((evts) -> { + * qry.setLocalListener((evts) -> { * for (CacheEntryEvent<? extends Long, ? extends Person> e : evts) { * Person p = e.getValue(); * @@ -94,7 +94,7 @@ * }); * * // Sets remote filter. - * qry.setRemoteFilterFactory(() -> new ExampleCacheEntryFilter()); + * qry.setRemoteFilterFactory(() -> new ExampleCacheEntryFilter()); * * // Execute query. * QueryCursor<Cache.Entry<Long, Person>> cur = cache.query(qry); diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java index 81ee7fa0538c5..36d4e055c26e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java @@ -27,7 +27,7 @@ */ public interface MarshallerContext { /** - * Method to register typeId->class name mapping in marshaller context cluster-wide. + * Method to register typeId->class name mapping in marshaller context <b>cluster-wide</b>. * * This method guarantees that mapping is delivered to all nodes in cluster * and blocks caller thread until then. @@ -68,7 +68,7 @@ public boolean registerClassName( ) throws IgniteCheckedException; /** - * Method to register typeId->class name mapping in marshaller context on local node only. + * Method to register typeId->class name mapping in marshaller context <b>on local node only</b>. * * No guarantees that the mapping is presented on other nodes are provided. * diff --git a/modules/core/src/main/java/org/apache/ignite/package-info.java b/modules/core/src/main/java/org/apache/ignite/package-info.java index 25da5efc6e265..20e49ffef4b32 100644 --- a/modules/core/src/main/java/org/apache/ignite/package-info.java +++ b/modules/core/src/main/java/org/apache/ignite/package-info.java @@ -16,7 +16,7 @@ */ /** - * Contains entry-point Ignite & HPC APIs. + * Contains entry-point Ignite & HPC APIs. */ package org.apache.ignite; diff --git a/modules/core/src/main/java/org/apache/ignite/services/ServiceCallInterceptor.java b/modules/core/src/main/java/org/apache/ignite/services/ServiceCallInterceptor.java index 14050797988da..4922ecab9c0a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/services/ServiceCallInterceptor.java +++ b/modules/core/src/main/java/org/apache/ignite/services/ServiceCallInterceptor.java @@ -34,7 +34,7 @@ *

* Usage example: *

- * ServiceCallInterceptor security = (mtd, args, ctx, svcCall) -> {
+ * ServiceCallInterceptor security = (mtd, args, ctx, svcCall) -> {
  *     if (!CustomSecurityProvider.get().access(mtd, ctx.currentCallContext().attribute("sessionId")))
  *         throw new SecurityException("Method invocation is not permitted");
  *
@@ -42,7 +42,7 @@
  *     return svcCall.call();
  * };
  *
- * ServiceCallInterceptor audit = (mtd, args, ctx, svcCall) -> {
+ * ServiceCallInterceptor audit = (mtd, args, ctx, svcCall) -> {
  *     String sessionId = ctx.currentCallContext().attribute("sessionId");
  *     AuditProvider prov = AuditProvider.get();
  *
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index d5129317815c8..b95bf22427d56 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -74,6 +74,17 @@ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp,
         }
     }
 
+    /**
+     * Creates timeout helper with an absolute time threshold. Sets {@code timeoutEnabled} to {@code false}.
+     *
+     * @param timeout Timeout in milliseconds.
+     */
+    public IgniteSpiOperationTimeoutHelper(long timeout) {
+        timeoutEnabled = false;
+
+        timeoutThreshold = System.nanoTime() + U.millisToNanos(timeout);
+    }
+
     /**
      * Returns a timeout value to use for the next network operation.
      *
@@ -111,7 +122,7 @@ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutE
      * @param e Exception to check.
      * @return {@code True} if given exception is a timeout. {@code False} otherwise.
      */
-    public boolean checkFailureTimeoutReached(Exception e) {
+    public static boolean checkFailureTimeoutReached(Exception e) {
         return X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e02d591550fb4..57aeb328bddba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -846,7 +846,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
                     break;
                 }
 
-                if (timeoutHelper.checkFailureTimeoutReached(e))
+                if (IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0ba98ae709756..acbccbb09a6e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -787,7 +787,7 @@ private boolean pingNode(TcpDiscoveryNode node) {
         for (InetSocketAddress addr : spi.getEffectiveNodeAddresses(node)) {
             try {
                 // ID returned by the node should be the same as ID of the parameter for ping to succeed.
-                IgniteBiTuple t = pingNode(addr, node.id(), clientNodeId);
+                IgniteBiTuple t = pingNode(addr, node.id(), clientNodeId, 0);
 
                 if (t == null)
                     // Remote node left topology.
@@ -818,18 +818,25 @@ private boolean pingNode(TcpDiscoveryNode node) {
      * @param addr Address of the node.
      * @param nodeId Node ID to ping. In case when client node ID is not null this node ID is an ID of the router node.
      * @param clientNodeId Client node ID.
+     * @param timeout Timeout on operation in milliseconds. If 0, a value based on {@link TcpDiscoverySpi} is used.
      * @return ID of the remote node and "client exists" flag if node alive or {@code null} if the remote node has
      *         left a topology during the ping process.
      * @throws IgniteCheckedException If an error occurs.
      */
-    @Nullable private IgniteBiTuple pingNode(InetSocketAddress addr, @Nullable UUID nodeId,
-        @Nullable UUID clientNodeId) throws IgniteCheckedException {
+    @Nullable private IgniteBiTuple pingNode(
+        InetSocketAddress addr,
+        @Nullable UUID nodeId,
+        @Nullable UUID clientNodeId,
+        long timeout
+    ) throws IgniteCheckedException {
         assert addr != null;
+        assert timeout >= 0;
 
-        UUID locNodeId = getLocalNodeId();
+        IgniteSpiOperationTimeoutHelper timeoutHelper = timeout == 0
+            ? new IgniteSpiOperationTimeoutHelper(spi, clientNodeId == null)
+            : new IgniteSpiOperationTimeoutHelper(timeout);
 
-        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi,
-            clientNodeId == null);
+        UUID locNodeId = getLocalNodeId();
 
         if (F.contains(spi.locNodeAddrs, addr)) {
             if (clientNodeId == null)
@@ -928,9 +935,10 @@ private boolean pingNode(TcpDiscoveryNode node) {
                             break;
                         }
 
-                        if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) {
+                        if (IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e)
+                            && (spi.failureDetectionTimeoutEnabled() || timeout != 0)) {
                             log.warning("Failed to ping node [nodeId=" + nodeId + "]. Reached the timeout " +
-                                spi.failureDetectionTimeout() + "ms. Cause: " + e.getMessage());
+                                (timeout == 0 ? spi.failureDetectionTimeout() : timeout) + "ms. Cause: " + e.getMessage());
 
                             break;
                         }
@@ -1579,7 +1587,7 @@ else if (U.millisSinceNanos(joinStartNanos) > spi.joinTimeout)
                     break;
                 }
 
-                if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                if (spi.failureDetectionTimeoutEnabled() && IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
@@ -2260,7 +2268,7 @@ private void cleanIpFinder() {
 
                         if (res == null) {
                             try {
-                                res = pingNode(addr, null, null) != null;
+                                res = pingNode(addr, null, null, 0) != null;
                             }
                             catch (IgniteCheckedException e) {
                                 if (log.isDebugEnabled())
@@ -3630,7 +3638,7 @@ else if (log.isTraceEnabled())
                                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
                                     break;
 
-                                if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                                if (spi.failureDetectionTimeoutEnabled() && IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                                     break;
                                 else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
                                     SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
@@ -3801,7 +3809,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
                             onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
-                            if (spi.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e))
+                            if (spi.failureDetectionTimeoutEnabled() && IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                                 break;
 
                             if (!spi.failureDetectionTimeoutEnabled()) {
@@ -7246,8 +7254,17 @@ private void ringMessageReceived() {
             lastRingMsgReceivedTime = System.nanoTime();
         }
 
-        /** @return Alive address if was able to connected to. {@code Null} otherwise. */
+        /**
+         * Asynchronously searches for an alive address of a node using a maximal timeout.
+         *
+         * @param node Node to ping.
+         * @param timeout Overall operation timeout.
+         * @return An address successfully connected to. {@code Null} if no alive address was detected within the timeout.
+         * @see #pingNode(InetSocketAddress, UUID, UUID, long)
+         */
         private InetSocketAddress checkConnection(TcpDiscoveryNode node, int timeout) {
+            IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(timeout);
+
             AtomicReference liveAddrHolder = new AtomicReference<>();
 
             List addrs = new ArrayList<>(spi.getEffectiveNodeAddresses(node));
@@ -7277,11 +7294,14 @@ private InetSocketAddress checkConnection(TcpDiscoveryNode node, int timeout) {
                         for (int i = 0; i < addrsToCheck; ++i) {
                             InetSocketAddress addr = addrs.get(addrIdx.getAndIncrement());
 
-                            try (Socket sock = new Socket()) {
+                            try {
                                 if (liveAddrHolder.get() == null) {
-                                    sock.connect(addr, perAddrTimeout);
+                                    UUID id = pingNode(addr, node.id(), null, timeoutHelper.nextTimeoutChunk(perAddrTimeout)).get1();
+
+                                    assert id == null || id.equals(node.id());
 
-                                    liveAddrHolder.compareAndSet(null, addr);
+                                    if (id != null)
+                                        liveAddrHolder.compareAndSet(null, addr);
                                 }
                             }
                             catch (Exception e) {
@@ -7514,7 +7534,7 @@ private boolean pingJoiningNode(TcpDiscoveryNode node) {
             for (InetSocketAddress addr : spi.getEffectiveNodeAddresses(node)) {
                 try {
                     if (!(addr.getAddress().isLoopbackAddress() && locNode.socketAddresses().contains(addr))) {
-                        IgniteBiTuple t = pingNode(addr, node.id(), null);
+                        IgniteBiTuple t = pingNode(addr, node.id(), null, 0);
 
                         if (t != null)
                             return true;
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java
index 4b98f75b0b004..751217899a71c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java
@@ -123,7 +123,7 @@
  *         if (useAvg) {
  *             double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs();
  *
- *             if (load > 0) {
+ *             if (load > 0) {
  *                 return load;
  *             }
  *         }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java
index 0b2bc4d16a895..61a282e8659cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java
@@ -52,7 +52,7 @@
  *         if (useAvg) {
  *             double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs();
  *
- *             if (load > 0) {
+ *             if (load > 0) {
  *                 return load;
  *             }
  *         }
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index 5e6d9f61f5b1a..5b36dff2c71b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -107,7 +107,7 @@
  *     Integer v1 = cache.get("k1");
  *
  *     // Check if v1 satisfies some condition before doing a put.
- *     if (v1 != null && v1 > 0)
+ *     if (v1 != null && v1 > 0)
  *         cache.put("k1", 2);
  *
  *     cache.remove("k2");
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index ea4ca2f035301..7162083af9cd2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -29,14 +29,18 @@
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -124,6 +128,9 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest {
     /** */
     private String localhost;
 
+    /** */
+    private IgniteLogger gridLog;
+
     /** */
     private final GridConcurrentHashSet segmentedNodes = new GridConcurrentHashSet<>();
 
@@ -156,6 +163,9 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest {
 
         cfg.setLocalHost(localhost);
 
+        if (gridLog != null)
+            cfg.setGridLogger(gridLog);
+
         return cfg;
     }
 
@@ -249,6 +259,92 @@ public void testBackwardNodeCheckWithSameLoopbackSeveralLocalAddresses() throws
         doTestBackwardNodeCheckWithSameLoopback("0.0.0.0");
     }
 
+    /**
+     * Tests backward ping when the discovery threads of the malfunction node is simulated to hang at GC.
+     * But the JVM is able to accept socket connections.
+     */
+    @Test
+    public void testBackwardConnectionCheckWhenDiscoveryThreadsSuspended() throws Exception {
+        ListeningTestLogger testLog = new ListeningTestLogger(log);
+
+        gridLog = testLog;
+
+        localhost = "127.0.0.1";
+
+        failureDetectionTimeout = 3000;
+
+        specialSpi = new TestDiscoverySpi();
+
+        // This node suspects its next failed.
+        Ignite doubtNode0 = startGrid(0);
+
+        // Simulates frozen threads on node 1 but answering sockets. I.e. Socket#connect() works to node 1 but
+        // reading anything with Socket#read() from it would fail with the timeout.
+        specialSpi = new TestDiscoverySpi();
+
+        // Node simulated 'frozen'. Can accept connections (socket accept) but won't write anything to a discovery socket.
+        Ignite frozenNode1 = startGrid(1);
+
+        UUID frozenNodeId = frozenNode1.cluster().localNode().id();
+
+        specialSpi = new TestDiscoverySpi();
+
+        setLoggerDebugLevel();
+
+        // Node which does the backward connection check to its previous 'frozen'.
+        Ignite pingingNode2 = startGrid(2);
+
+        LogListener node1SegmentedLogLsnr = LogListener.matches("Local node SEGMENTED: TcpDiscoveryNode [id=" + frozenNode1).build();
+
+        // Node1 must leave the cluster.
+        LogListener backwardPingLogLsnr = LogListener.matches("Remote node requests topology change. Checking connection to " +
+            "previous [TcpDiscoveryNode [id=" + frozenNodeId).build();
+
+        testLog.registerListener(node1SegmentedLogLsnr);
+        testLog.registerListener(backwardPingLogLsnr);
+
+        // Result of the ping from node2 ot node1.
+        AtomicReference backwardPingResult = new AtomicReference<>();
+
+        // Request to establish new permanent cluster connection from doubting node0 to node2.
+        testSpi(doubtNode0).hsRqLsnr.set((s, hsRq) -> {
+            if (hsRq.changeTopology() && frozenNodeId.equals(hsRq.checkPreviousNodeId())) {
+                // Continue simulation of node1 freeze at GC and processes no discovery messages.
+                testSpi(frozenNode1).addrsToBlock = Collections.emptyList();
+            }
+        });
+
+        // Response from node2 to node0 with negative check of frozen node1.
+        testSpi(pingingNode2).hsRespLsnr.set((s, hsResp) -> {
+            backwardPingResult.set(hsResp.previousNodeAlive());
+        });
+
+        // Begin simulation of node1 freeze at GC and processes no discovery messages and wait till
+        // the discovery traffic node0->node1 stops.
+        testSpi(doubtNode0).addrsToBlock = spi(frozenNode1).locNodeAddrs;
+        assertTrue(waitForCondition(() -> testSpi(doubtNode0).blocked, getTestTimeout()));
+
+        // Wait till the discovery traffic node1->node2 stops too.
+        assertTrue(waitForCondition(() -> testSpi(frozenNode1).blocked, getTestTimeout()));
+
+        // Wait till the backward connection check and ensure the result is negative (node1 confirmed failed).
+        assertTrue(backwardPingLogLsnr.check(getTestTimeout()));
+        assertTrue(waitForCondition(() -> backwardPingResult.get() != null, getTestTimeout()));
+
+        assertFalse(backwardPingResult.get());
+
+        assertTrue(backwardPingLogLsnr.check(getTestTimeout()));
+
+        // Node0 and node2 must survive.
+        assertTrue(waitForCondition(() -> doubtNode0.cluster().nodes().size() == 2
+                && !doubtNode0.cluster().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet()).contains(frozenNodeId),
+            getTestTimeout()));
+
+        assertTrue(waitForCondition(() -> pingingNode2.cluster().nodes().size() == 2
+                && !pingingNode2.cluster().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet()).contains(frozenNodeId),
+            getTestTimeout()));
+    }
+
     /**
      * Performs Tests backward node ping if {@link TcpDiscoveryNode#socketAddresses()} contains same loopback address as of local node.
      * Assumes several local address are resolved.
@@ -396,18 +492,17 @@ private void simulateFailureOfTwoNodes(boolean sequentionally) throws Exception
      */
     @Test
     public void testBackwardConnectionCheckFailedLogMessage() throws Exception {
+        startGrid(0);
+
         ListeningTestLogger testLog = new ListeningTestLogger(log);
 
         LogListener lsnr0 = LogListener.matches("Failed to check connection to previous node").times(2).build();
 
         testLog.registerListener(lsnr0);
 
-        startGrid(0);
-
-        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1));
-        cfg.setGridLogger(testLog);
+        gridLog = testLog;
 
-        startGrid(cfg);
+        startGrid(1);
 
         startGrid(2);
 
@@ -507,15 +602,6 @@ private static final class TestDiscoverySpi extends TcpDiscoverySpi {
                 impl = new ServerImpl(this, 1);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
-            long timeout) throws IOException {
-            if (dropMsg(sock))
-                return;
-
-            super.writeToSocket(msg, sock, res, timeout);
-        }
-
         /** */
         private boolean dropMsg(Socket sock) {
             Collection addrsToBlock = this.addrsToBlock;
@@ -531,6 +617,15 @@ private boolean dropMsg(Socket sock) {
             return false;
         }
 
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+            long timeout) throws IOException {
+            if (dropMsg(sock))
+                return;
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
diff --git a/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java b/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java
index 2303473b44811..37450a0b64a3c 100644
--- a/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java
+++ b/modules/log4j2/src/main/java/org/apache/ignite/logger/log4j2/Log4J2Logger.java
@@ -67,7 +67,7 @@
  *      <property name="gridLogger">
  *          <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
  *              <constructor-arg type="java.lang.String" value="config/ignite-log4j.xml"/>
- *          </bean>
+ *          </bean>
  *      </property>
  * 
* and from your code: diff --git a/parent/pom.xml b/parent/pom.xml index 395df17f16d64..40b170b9f3f6f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -105,7 +105,7 @@ 42.7.3 1.7.33 1.1.10.4 - 5.2.25.RELEASE + 5.3.39 3.1.2 9.0.63 0.8.3 diff --git a/pom.xml b/pom.xml index d220357d6ab13..a11ecf398489c 100644 --- a/pom.xml +++ b/pom.xml @@ -150,6 +150,7 @@ validate + true