Skip to content

Commit

Permalink
IGNITE-20780 Sql. Move session expiration to IgniteSqlImpl (apache#2785)
Browse files Browse the repository at this point in the history
  • Loading branch information
korlov42 authored Nov 6, 2023
1 parent 3125cb5 commit 41ef23d
Show file tree
Hide file tree
Showing 20 changed files with 711 additions and 364 deletions.
3 changes: 3 additions & 0 deletions modules/api/src/main/java/org/apache/ignite/sql/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
*/
Flow.Publisher<Void> closeReactive();

/** Returns {@code true} if the given session has been closed, returns {@code false} otherwise. */
boolean closed();

/**
* Creates a new session builder from the current session.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ public Publisher<Void> closeReactive() {
throw new UnsupportedOperationException("Not implemented yet.");
}

/** {@inheritDoc} */
@Override
public boolean closed() {
// TODO IGNITE-17134 Cancel/close all active cursors, queries, futures.
return false;
}

/** {@inheritDoc} */
@Override
public SessionBuilder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ public Publisher<Void> closeReactive() {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override
public boolean closed() {
return false;
}

/** {@inheritDoc} */
@Override
public SessionBuilder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,17 @@ public static String threadPrefix(String nodeName, String poolName) {
public static NamedThreadFactory create(String nodeName, String poolName, IgniteLogger logger) {
return new NamedThreadFactory(threadPrefix(nodeName, poolName), logger);
}

/**
* Creates a thread factory based on a node's name and a name of the pool.
*
* @param nodeName Node name.
* @param poolName Pool name.
* @param daemon Whether threads created by the factory should be daemon or not.
* @param logger Logger.
* @return Thread factory.
*/
public static NamedThreadFactory create(String nodeName, String poolName, boolean daemon, IgniteLogger logger) {
return new NamedThreadFactory(threadPrefix(nodeName, poolName), daemon, logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,22 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;

/** Test common SQL API. */
Expand All @@ -69,12 +65,12 @@ public void testSessionExpiration() throws Exception {
ResultSet rs1 = ses1.execute(null, "SELECT id FROM TST");
ResultSet rs2 = ses2.execute(null, "SELECT id FROM TST");

waitForCondition(() -> queryProcessor().liveSessions().size() == 1, 10_000);
assertTrue(waitForCondition(ses1::closed, 10_000));

// first session should no longer exist for the moment
ExecutionException err = assertThrows(ExecutionException.class, () -> ses1.executeAsync(null, "SELECT 1 + 1").get());
assertThat(err.getCause(), instanceOf(IgniteException.class));
assertThat(err.getCause().getMessage(), containsString("Session not found"));
assertThat(err.getCause().getMessage(), containsString("Session is closed"));

// already started query should fail due to session has been expired
assertThrowsWithCause(() -> {
Expand Down Expand Up @@ -143,31 +139,4 @@ public void checkTimestampOperations() {
assertEquals(expDateTimeStr, res.next().datetimeValue(1).toString());
}
}

private static class ErroneousSchemaManager implements SqlSchemaManager {

/** {@inheritDoc} */
@Override
public @Nullable SchemaPlus schema(int version) {
return null;
}

/** {@inheritDoc} */
@Override
public @Nullable SchemaPlus schema(long timestamp) {
return null;
}

/** {@inheritDoc} */
@Override
public CompletableFuture<Void> schemaReadyFuture(int version) {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override
public @Nullable IgniteTable table(int schemaVersion, int tableId) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void testMetricsDuringTimeouts() throws Exception {

assertTrue(waitForCondition(() -> queryProcessor().liveSessions().isEmpty(), 10_000));

assertInternalSqlException("Session not found", () -> session.execute(null, "SELECT * from " + DEFAULT_TABLE_NAME));
assertInternalSqlException("Session is closed", () -> session.execute(null, "SELECT * from " + DEFAULT_TABLE_NAME));

assertMetricValue(clientMetricSet, SqlClientMetricSource.METRIC_OPEN_CURSORS, 1);
rs1.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public class IgniteImpl implements Ignite {
private final SqlQueryProcessor qryEngine;

/** Sql API facade. */
private final IgniteSql sql;
private final IgniteSqlImpl sql;

/** Configuration manager that handles node (local) configuration. */
private final ConfigurationManager nodeCfgMgr;
Expand Down Expand Up @@ -629,7 +629,7 @@ public class IgniteImpl implements Ignite {
systemViewManager
);

sql = new IgniteSqlImpl(qryEngine, new IgniteTransactionsImpl(txManager, observableTimestampTracker));
sql = new IgniteSqlImpl(name, qryEngine, new IgniteTransactionsImpl(txManager, observableTimestampTracker));

var deploymentManagerImpl = new DeploymentManagerImpl(
clusterSvc,
Expand Down Expand Up @@ -826,7 +826,8 @@ public CompletableFuture<Ignite> start(Path configPath) {
indexBuildingManager,
qryEngine,
clientHandlerModule,
deploymentManager
deploymentManager,
sql
);

// The system view manager comes last because other components
Expand Down
1 change: 1 addition & 0 deletions modules/sql-engine/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-schema')))
testImplementation(testFixtures(project(':ignite-storage-api')))
testImplementation(testFixtures(project(':ignite-sql-engine')))
testImplementation(testFixtures(project(':ignite-distribution-zones')))
testImplementation(testFixtures(project(':ignite-placement-driver-api')))
testImplementation(testFixtures(project(':ignite-vault')))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@
* Asynchronous result set implementation.
*/
public class AsyncResultSetImpl<T> implements AsyncResultSet<T> {
private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE =
private static final CompletableFuture<? extends AsyncResultSet<?>> HAS_NO_MORE_PAGE_FUTURE =
CompletableFuture.failedFuture(new SqlException(CURSOR_NO_MORE_PAGES_ERR, "There are no more pages."));

private final AsyncSqlCursor<List<Object>> cur;
private final IdleExpirationTracker expirationTracker;

private final AsyncSqlCursor<List<Object>> cursor;

private volatile BatchedResult<List<Object>> curPage;

Expand All @@ -58,31 +60,43 @@ public class AsyncResultSetImpl<T> implements AsyncResultSet<T> {
/**
* Constructor.
*
* @param cur Asynchronous query cursor.
* @param cursor Query cursor representing the result of execution.
* @param page Current page.
* @param pageSize Size of the page to fetch.
* @param expirationTracker A tracker to register any interaction with given result set.
* Used to prevent session from expiration.
* @param closeRun Callback to be invoked after result is closed.
*/
public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur, BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) {
this.cur = cur;
AsyncResultSetImpl(
AsyncSqlCursor<List<Object>> cursor,
BatchedResult<List<Object>> page,
int pageSize,
IdleExpirationTracker expirationTracker,
Runnable closeRun
) {
this.cursor = cursor;
this.curPage = page;
this.pageSize = pageSize;
this.expirationTracker = expirationTracker;
this.closeRun = closeRun;
}

/** {@inheritDoc} */
@Override
public @Nullable ResultSetMetadata metadata() {
return hasRowSet() ? cur.metadata() : null;
return hasRowSet() ? cursor.metadata() : null;
}

/** {@inheritDoc} */
@Override
public boolean hasRowSet() {
return cur.queryType() == SqlQueryType.QUERY || cur.queryType() == SqlQueryType.EXPLAIN;
return cursor.queryType() == SqlQueryType.QUERY || cursor.queryType() == SqlQueryType.EXPLAIN;
}

/** {@inheritDoc} */
@Override
public long affectedRows() {
if (cur.queryType() != SqlQueryType.DML) {
if (cursor.queryType() != SqlQueryType.DML) {
return -1;
}

Expand All @@ -94,7 +108,7 @@ public long affectedRows() {
/** {@inheritDoc} */
@Override
public boolean wasApplied() {
if (cur.queryType() != SqlQueryType.DDL) {
if (cursor.queryType() != SqlQueryType.DDL) {
return false;
}

Expand All @@ -108,8 +122,10 @@ public boolean wasApplied() {
public Iterable<T> currentPage() {
requireResultSet();

expirationTracker.touch();

final Iterator<List<Object>> it0 = curPage.items().iterator();
final ResultSetMetadata meta0 = cur.metadata();
final ResultSetMetadata meta0 = cursor.metadata();

// TODO: IGNITE-18695 map rows to objects when mapper is provided.
return () -> new TransformingIterator<>(it0, (item) -> (T) new SqlRowImpl(item, meta0));
Expand All @@ -128,10 +144,12 @@ public int currentPageSize() {
public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
requireResultSet();

expirationTracker.touch();

if (!hasMorePages()) {
return (CompletableFuture<? extends AsyncResultSet<T>>) HAS_NO_MORE_PAGE_FUTURE;
} else {
return cur.requestNextAsync(pageSize)
return cursor.requestNextAsync(pageSize)
.thenApply(page -> {
curPage = page;

Expand All @@ -149,7 +167,7 @@ public boolean hasMorePages() {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> closeAsync() {
return cur.closeAsync().thenRun(closeRun);
return cursor.closeAsync().thenRun(closeRun);
}

private void requireResultSet() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.api;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.sql.engine.CurrentTimeProvider;

/**
* A tracker to check if particular object has not been accessed longer than specified duration.
*
* <p>There are two different types of interaction:<ol>
* <li>
* `touch()` -- updates last access time. Use this method whenever access to the object
* is considered legit, thus should prevent it from expiration.
* </li>
* <li>
* `expired()` -- only checks if object wasn't been accessed longer than specified timeout,
* but doesn't update access time.
* </li>
* </ol>
*
*/
class IdleExpirationTracker {
/** Marker used to mark a session which has been expired. */
private static final long EXPIRED = 0L;

private final long idleTimeoutMs;
private final AtomicLong lastTouched;
private final CurrentTimeProvider currentTimeProvider;

IdleExpirationTracker(
long idleTimeoutMs,
CurrentTimeProvider currentTimeProvider
) {
this.idleTimeoutMs = idleTimeoutMs;
this.currentTimeProvider = currentTimeProvider;

lastTouched = new AtomicLong(currentTimeProvider.now());
}

/**
* Checks if the given object has not been accessed longer than specified duration.
*
* @return {@code true} if object is expired.
*/
@SuppressWarnings("SimplifiableIfStatement")
boolean expired() {
long last = lastTouched.get();

if (last == EXPIRED) {
return true;
}

return currentTimeProvider.now() - last > idleTimeoutMs
&& (lastTouched.compareAndSet(last, EXPIRED) || lastTouched.get() == EXPIRED);
}

/**
* Updates the timestamp that is used to determine whether the object has expired or not.
*
* @return {@code true} if this object has been updated, otherwise returns {@code false}
* meaning the object has expired.
*/
boolean touch() {
long last;
long now;
do {
last = lastTouched.get();

// tracker has been marked as expired
if (last == EXPIRED) {
return false;
}

now = currentTimeProvider.now();

if (now - last > idleTimeoutMs && expired()) {
return false;
}
} while (!lastTouched.compareAndSet(last, now));

return true;
}
}
Loading

0 comments on commit 41ef23d

Please sign in to comment.