Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22501 Sql. Batching DDL statement for scripts #5143

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.benchmark;

import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.TestIgnitionManager.PRODUCTION_CLUSTER_CONFIG_STRING;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.sql.IgniteSql;
import org.jetbrains.annotations.Nullable;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/**
* Benchmark that measures execution of a ddl script on single-node cluster with default configuration.
*/
@State(Scope.Benchmark)
@Fork(1)
@Threads(1)
@Warmup(iterations = 3, time = 1)
@Measurement(iterations = 7, time = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.SECONDS)
@SuppressWarnings({"WeakerAccess", "unused"})
public class DdlBatchingBenchmark extends AbstractMultiNodeBenchmark {
private static final String DROP_TABLE_TEMPLATE = "DROP TABLE IF EXISTS table_{};";
private static final String CREATE_TABLE_TEMPLATE = "CREATE TABLE table_{} (id INT PRIMARY KEY, c_1 INT, c_2 INT, c_3 INT, c_4 INT);";
private static final String CREATE_INDEX_TEMPLATE = "CREATE INDEX table_{}_c_{}_ind ON table_{} (c_{});";

@Param({"1", "2", "3"})
private int numberOfTables;

@Param({"0", "2", "4"})
private int numberOfSecondaryIndexesPerTable;

private IgniteSql sql;
private String scriptText;

/** Initializes script text. */
@Setup
public void setUp() throws IOException {
sql = publicIgnite.sql();

StringBuilder sb = new StringBuilder();

for (int t = 0; t < numberOfTables; t++) {
sb.append(format(DROP_TABLE_TEMPLATE, t)).append(System.lineSeparator());
sb.append(format(CREATE_TABLE_TEMPLATE, t)).append(System.lineSeparator());

for (int i = 1; i <= numberOfSecondaryIndexesPerTable; i++) {
sb.append(format(CREATE_INDEX_TEMPLATE, t, i, t, i)).append(System.lineSeparator());
}
}

scriptText = sb.toString();
}

/** Benchmark's body. */
@Benchmark
public void test(Blackhole bh) {
sql.executeScript(scriptText);
}

/**
* Benchmark's entry point.
*/
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + DdlBatchingBenchmark.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}

@Override
protected int nodes() {
return 1;
}

@Override
protected void createDistributionZoneOnStartup() {
// no-op
}

@Override
protected void createTableOnStartup() {
// no-op
}

@Override
protected @Nullable String clusterConfiguration() {
return PRODUCTION_CLUSTER_CONFIG_STRING;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.sql.engine;

import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
Expand Down Expand Up @@ -236,4 +237,31 @@ void concurrentExecutionDoesntAffectSelectWithImplicitTx() {

iterateThroughResultsAndCloseThem(cursor);
}

@Test
@SuppressWarnings("unchecked")
void indexesAvailableAfterScriptExecutionAndBuiltProperly() {
long tableSize = 1_000;

@SuppressWarnings("ConcatenationWithEmptyString")
String script = ""
+ "CREATE TABLE integers (id INT PRIMARY KEY, val_1 INT, val_2 INT);"
+ "CREATE INDEX integers_val_1_ind ON integers(val_1);"
+ "INSERT INTO integers SELECT x, x, x FROM system_range(1, " + tableSize + ");"
+ "CREATE INDEX integers_val_2_ind ON integers(val_2);";

AsyncSqlCursor<InternalSqlRow> cursor = runScript(script);

iterateThroughResultsAndCloseThem(cursor);

assertQuery("SELECT /*+ FORCE_INDEX(INTEGERS_VAL_1_IND) */ COUNT(*) FROM integers WHERE val_1 > 0")
.matches(containsIndexScan("PUBLIC", "INTEGERS", "INTEGERS_VAL_1_IND"))
.returns(tableSize)
.check();

assertQuery("SELECT /*+ FORCE_INDEX(INTEGERS_VAL_2_IND) */ COUNT(*) FROM integers WHERE val_2 > 0")
.matches(containsIndexScan("PUBLIC", "INTEGERS", "INTEGERS_VAL_2_IND"))
.returns(tableSize)
.check();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.ignite.internal.sql.engine.exec;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;

/**
Expand All @@ -37,4 +41,19 @@ public interface ExecutionService extends LifecycleAware {
CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
QueryPlan plan, SqlOperationContext operationContext
);

/**
* Executes given batch of {@link DdlPlan}s atomically.
*
* <p>The whole batch will be executed at once. If exception arises during execution of any plan within the batch, then
* none of the changes will be saved.
*
* @param batch A batch to execute.
* @param activationTimeListener A listener to notify with activation time of catalog in which all changes become visible.
* @return A future containing a list of cursors representing result of execution, one per each command.
*/
CompletableFuture<List<AsyncDataCursor<InternalSqlRow>>> executeDdlBatch(
List<DdlPlan> batch,
Consumer<HybridTimestamp> activationTimeListener
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
Expand Down Expand Up @@ -407,6 +408,30 @@ public CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
}
}

@Override
public CompletableFuture<List<AsyncDataCursor<InternalSqlRow>>> executeDdlBatch(
List<DdlPlan> batch,
Consumer<HybridTimestamp> activationTimeListener
) {
List<CatalogCommand> commands = batch.stream()
.map(DdlPlan::command)
.collect(Collectors.toList());

return ddlCmdHnd.handle(commands).thenApply(result -> {
activationTimeListener.accept(HybridTimestamp.hybridTimestamp(result.getCatalogTime()));

List<AsyncDataCursor<InternalSqlRow>> cursors = new ArrayList<>(commands.size());
for (int i = 0; i < commands.size(); i++) {

List<InternalSqlRow> resultSet = result.isApplied(i) ? APPLIED_ANSWER : NOT_APPLIED_ANSWER;

cursors.add(new IteratorToDataCursorAdapter<>(resultSet.iterator()));
}

return cursors;
});
}

private AsyncDataCursor<InternalSqlRow> executeExecutablePlan(
SqlOperationContext operationContext,
ExecutablePlan plan
Expand Down Expand Up @@ -454,23 +479,17 @@ private AsyncDataCursor<InternalSqlRow> executeExecutablePlan(
private AsyncDataCursor<InternalSqlRow> executeDdl(SqlOperationContext operationContext,
DdlPlan plan
) {
CompletableFuture<Iterator<InternalSqlRow>> ret = ddlCmdHnd.handle(plan.command())
.thenApply(activationTime -> {
if (activationTime == null) {
return NOT_APPLIED_ANSWER.iterator();
}
CompletableFuture<Iterator<InternalSqlRow>> ret = ddlCmdHnd.handle(plan.command()).thenApply(result -> {
QueryTransactionContext txCtx = operationContext.txContext();

QueryTransactionContext txCtx = operationContext.txContext();
assert txCtx != null;

assert txCtx != null;
txCtx.updateObservableTime(HybridTimestamp.hybridTimestamp(result.getCatalogTime()));

txCtx.updateObservableTime(HybridTimestamp.hybridTimestamp(activationTime));
List<InternalSqlRow> resultSet = result.isApplied(0) ? APPLIED_ANSWER : NOT_APPLIED_ANSWER;

return APPLIED_ANSWER.iterator();
})
.exceptionally(th -> {
throw convertDdlException(th);
});
return resultSet.iterator();
});

return new IteratorToDataCursorAdapter<>(ret, Runnable::run);
}
Expand All @@ -496,23 +515,6 @@ private AsyncDataCursor<InternalSqlRow> executeKill(
return new IteratorToDataCursorAdapter<>(ret, Runnable::run);
}

private static RuntimeException convertDdlException(Throwable e) {
e = ExceptionUtils.unwrapCause(e);

if (e instanceof ConfigurationChangeException) {
assert e.getCause() != null;
// Cut off upper configuration error`s as uninformative.
e = e.getCause();
}

if (e instanceof IgniteInternalCheckedException) {
return new IgniteInternalException(INTERNAL_ERR, "Failed to execute DDL statement [stmt=" /* + qry.sql() */
+ ", err=" + e.getMessage() + ']', e);
}

return (e instanceof RuntimeException) ? (RuntimeException) e : new IgniteInternalException(INTERNAL_ERR, e);
}

@SuppressWarnings("MethodMayBeStatic")
private AsyncDataCursor<InternalSqlRow> executeExplain(ExplainPlan plan) {
String planString = plan.plan().explain();
Expand Down
Loading