From 37be13d1144f4e66e2ca1f1f8e1f0dbc22fb5328 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Mon, 25 Nov 2024 18:50:20 +0300 Subject: [PATCH] IGNITE-22733 Support transactions in Scan query (#11580) --- .../query/calcite/exec/ExecutionContext.java | 2 + .../internal/client/thin/TcpClientCache.java | 9 +- .../processors/cache/GridCacheAdapter.java | 14 +- .../processors/cache/GridCacheContext.java | 51 ++++ .../cache/IgniteCacheProxyImpl.java | 17 +- .../query/AbstractScanQueryIterator.java | 277 +++++++++++++++++ .../processors/cache/query/CacheQuery.java | 120 +++++++- .../GridCacheDistributedQueryManager.java | 3 +- .../cache/query/GridCacheQueryManager.java | 38 +-- .../cache/query/GridCacheQueryRequest.java | 50 +++- .../cache/query/ScanQueryIterator.java | 234 +-------------- .../transactions/TransactionChanges.java | 7 + .../datastructures/GridCacheSetImpl.java | 4 +- .../cache/AbstractTransactionalQueryTest.java | 183 ++++++++++++ .../IgniteCacheBinaryObjectsScanSelfTest.java | 20 +- ...eCacheScanPredicateDeploymentSelfTest.java | 24 +- .../GridCacheQueryTransformerSelfTest.java | 279 ++++++++++-------- .../ScanQueryTransactionIsolationTest.java | 176 +++++++++++ ...QueryTransactionsUnsupportedModesTest.java | 76 +++++ .../IgniteBinaryCacheQueryTestSuite4.java | 5 + 20 files changed, 1177 insertions(+), 412 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/AbstractScanQueryIterator.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AbstractTransactionalQueryTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionsUnsupportedModesTest.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java index 436cf49a5d51f..80800fc2d3141 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java @@ -38,6 +38,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; @@ -346,6 +347,7 @@ public static Collection transactionChanges( * @param cmp Comparator to sort new and updated entries. * @return First, set of object changed in transaction, second, list of transaction data in required format. * @param Required type. + * @see GridCacheContext#transactionChanges(Integer) */ public TransactionChanges transactionChanges( int cacheId, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java index f1966f4c1f7d2..974a3800eb392 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java @@ -1137,7 +1137,12 @@ public IgniteClientFuture removeAllConflictAsync(Map> scanQuery(ScanQuery qry) { Consumer qryWriter = payloadCh -> { - writeCacheInfo(payloadCh); + writeCacheInfo( + payloadCh, + payloadCh.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.TX_AWARE_QUERIES) + ? transactions.tx() + : null + ); BinaryOutputStream out = payloadCh.out(); @@ -1155,7 +1160,7 @@ private QueryCursor> scanQuery(ScanQuery qry) { return new ClientQueryCursor<>(new ClientQueryPager<>( ch, - null, + transactions.tx(), ClientOperation.QUERY_SCAN, ClientOperation.QUERY_SCAN_CURSOR_GET_PAGE, qryWriter, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 70032fcbfff94..fac7c9a2e6e38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; @@ -3536,8 +3537,17 @@ private Iterator> igniteIterator(boolean keepBinary, final CacheOperationContext opCtx = ctx.operationContextPerCall(); - final GridCloseableIterator> iter = ctx0.queries().createScanQuery(p, null, keepBinary, null) - .executeScanQuery(); + final TransactionChanges txChanges = ctx.transactionChanges(null); + + final GridCloseableIterator> iter = ctx0.queries().createScanQuery( + p, + null, + null, + keepBinary, + false, + null, + txChanges.changedKeys() + ).executeScanQuery(txChanges.newAndUpdatedEntries()); return ctx.itHolder().iterator(iter, new CacheIteratorConverter, Map.Entry>() { @Override protected Cache.Entry convert(Map.Entry e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index dc5b8a7b2cf2d..5cfbadabb9b26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.expiry.EternalExpiryPolicy; @@ -83,9 +85,11 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges; import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -2350,6 +2354,53 @@ public void dumpListener(DumpEntryChangeListener dumpEntryChangeLsnr) { this.dumpLsnr = dumpEntryChangeLsnr; } + /** + * @param part Partition. + * @return First, set of object changed in transaction, second, list of transaction data in required format. + * @see ExecutionContext#transactionChanges(int, int[], Function) + */ + public TransactionChanges transactionChanges(Integer part) { + if (!U.isTxAwareQueriesEnabled(ctx)) + return TransactionChanges.empty(); + + IgniteInternalTx tx = tm().tx(); + + if (tx == null) + return TransactionChanges.empty(); + + IgniteTxManager.ensureTransactionModeSupported(tx.isolation()); + + Set changedKeys = new HashSet<>(); + List newAndUpdatedRows = new ArrayList<>(); + + for (IgniteTxEntry e : tx.writeEntries()) { + if (e.cacheId() != cacheId) + continue; + + int epart = e.key().partition(); + + assert epart != -1; + + if (part != null && epart != part) + continue; + + changedKeys.add(e.key()); + + CacheObject val = e.value(); + + boolean hasEntryProcessors = !F.isEmpty(e.entryProcessors()); + + if (hasEntryProcessors) + val = e.applyEntryProcessors(val); + + // Mix only updated or inserted entries. In case val == null entry removed. + if (val != null) + newAndUpdatedRows.add(hasEntryProcessors ? F.t(e.key(), val) : e); + } + + return new TransactionChanges<>(changedKeys, newAndUpdatedRows); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, igniteInstanceName()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index dba67c351000e..9f319f0cff99a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; +import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; @@ -464,9 +465,9 @@ public IgniteCacheProxy gatewayWrapper() { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - private QueryCursor query( - final ScanQuery scanQry, - @Nullable final IgniteClosure transformer, + private QueryCursor query( + final ScanQuery scanQry, + @Nullable final IgniteClosure, R> transformer, @Nullable ClusterGroup grp ) throws IgniteCheckedException { GridCacheContext ctx = getContextSafe(); @@ -477,8 +478,10 @@ private QueryCursor query( IgniteBiPredicate p = scanQry.getFilter(); + TransactionChanges txChanges = ctx.transactionChanges(scanQry.getPartition()); + final CacheQuery qry = ctx.queries().createScanQuery( - p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal(), null); + p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal(), null, txChanges.changedKeys()); if (scanQry.getPageSize() > 0) qry.pageSize(scanQry.getPageSize()); @@ -489,7 +492,7 @@ private QueryCursor query( final GridCloseableIterator iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN, cacheName, ctx, new IgniteOutClosureX>() { @Override public GridCloseableIterator applyx() throws IgniteCheckedException { - return qry.executeScanQuery(); + return qry.executeScanQuery(txChanges.newAndUpdatedEntries()); } }, true); @@ -830,7 +833,7 @@ private QueryCursor> queryContinuous(AbstractContinuousQuery q null, keepBinary, true).get(0); if (qry instanceof ScanQuery) - return query((ScanQuery)qry, null, projection(qry.isLocal())); + return query((ScanQuery)qry, null, projection(qry.isLocal())); return (QueryCursor)query(qry, projection(qry.isLocal())); } @@ -860,7 +863,7 @@ private QueryCursor> queryContinuous(AbstractContinuousQuery q validate(qry); - return query((ScanQuery)qry, transformer, projection(qry.isLocal())); + return query((ScanQuery)qry, (IgniteClosure, R>)transformer, projection(qry.isLocal())); } catch (Exception e) { if (e instanceof CacheException) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/AbstractScanQueryIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/AbstractScanQueryIterator.java new file mode 100644 index 0000000000000..dbf156ed8dce1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/AbstractScanQueryIterator.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.NoSuchElementException; +import java.util.UUID; +import javax.cache.Cache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.events.CacheQueryReadEvent; +import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; +import org.apache.ignite.internal.processors.security.SecurityUtils; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.injectResources; +import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId; + +/** + * Abstract scan query iterator. + */ +public abstract class AbstractScanQueryIterator extends GridCloseableIteratorAdapter { + /** */ + private final IgniteBiPredicate filter; + + /** */ + private final Runnable closeFilterClo; + + /** */ + protected final boolean statsEnabled; + + /** */ + private final boolean keepBinary; + + /** */ + private final boolean readEvt; + + /** */ + private final UUID subjId; + + /** */ + private final String taskName; + + /** */ + private final IgniteClosure, R> transform; + + /** */ + protected final GridCacheContext cctx; + + /** */ + private final boolean locNode; + + /** */ + private R next; + + /** */ + private boolean needAdvance; + + /** + * @param cctx Grid cache context. + * @param qry Query adapter. + * @param transform Optional transformer. + * @param locNode Flag for local node iterator. + * @throws IgniteCheckedException If failed. + */ + protected AbstractScanQueryIterator( + GridCacheContext cctx, + CacheQuery qry, + IgniteClosure, R> transform, + boolean locNode + ) throws IgniteCheckedException { + this.cctx = cctx; + this.locNode = locNode; + closeFilterClo = qry.scanFilter() instanceof PlatformCacheEntryFilter + ? () -> closeFilter(qry.scanFilter()) + : null; + filter = prepareFilter(qry.scanFilter()); + this.transform = SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteClosure.class, injectResources(transform, cctx)); + + statsEnabled = cctx.statisticsEnabled(); + + readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) && + cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ); + + taskName = readEvt ? cctx.kernalContext().task().resolveTaskName(qry.taskHash()) : null; + + subjId = securitySubjectId(cctx); + + // keep binary for remote scans if possible + keepBinary = (!locNode && filter == null && transform == null && !readEvt) || qry.keepBinary(); + + needAdvance = true; + } + + /** {@inheritDoc} */ + @Override protected R onNext() { + if (needAdvance) + next = advance(); + else + needAdvance = true; + + if (next == null) + throw new NoSuchElementException(); + + return next; + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() { + if (needAdvance) { + next = advance(); + + needAdvance = false; + } + + return next != null; + } + + /** {@inheritDoc} */ + @Override protected void onClose() { + if (closeFilterClo != null) + closeFilterClo.run(); + } + + /** Moves the iterator to the next cache entry. */ + protected abstract R advance(); + + /** + * Perform filtering and transformation of key-value pair. + * + * @return Object to return to the user, or {@code null} if filtered. + */ + public R filterAndTransform( + final KeyCacheObject key, + final CacheObject val, + final long start + ) { + if (statsEnabled) { + CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.onRead(true); + + metrics.addGetTimeNanos(System.nanoTime() - start); + } + + K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(cctx.cacheObjectContext(), key, keepBinary, false); + V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(cctx.cacheObjectContext(), val, keepBinary, false); + + if (filter != null) { + try { + if (!filter.apply(key0, val0)) + return null; + } + catch (Throwable e) { + throw new IgniteException(e); + } + } + + if (readEvt) { + cctx.gridEvents().record(new CacheQueryReadEvent<>( + cctx.localNode(), + "Scan query entry read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SCAN.name(), + cctx.name(), + null, + null, + filter, + null, + null, + subjId, + taskName, + key0, + val0, + null, + null)); + } + + if (transform != null) { + try { + return transform.apply(new CacheQueryEntry<>(key0, val0)); + } + catch (Throwable e) { + throw new IgniteException(e); + } + } + + return (R)(!locNode ? new T2<>(key0, val0) : + new CacheQueryEntry<>(key0, val0)); + } + + /** */ + @Nullable + public IgniteBiPredicate filter() { + return filter; + } + + /** */ + private @Nullable IgniteBiPredicate prepareFilter(IgniteBiPredicate keyValFilter) throws IgniteCheckedException { + if (keyValFilter == null) + return null; + + try { + if (keyValFilter instanceof PlatformCacheEntryFilter) + ((PlatformCacheEntryFilter)keyValFilter).cacheContext(cctx); + else + injectResources(keyValFilter, cctx); + + return SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteBiPredicate.class, keyValFilter); + } + catch (IgniteCheckedException | RuntimeException e) { + closeFilter(keyValFilter); + + throw e; + } + } + + /** */ + public static void closeFilter(IgniteBiPredicate filter) { + if (filter instanceof PlatformCacheEntryFilter) + ((PlatformCacheEntryFilter)filter).onClose(); + } + + /** */ + public IgniteClosure, R> transformer() { + return transform; + } + + /** */ + public boolean local() { + return locNode; + } + + /** */ + public boolean keepBinary() { + return keepBinary; + } + + /** */ + public UUID subjectId() { + return subjId; + } + + /** */ + public String taskName() { + return taskName; + } + + /** */ + public GridCacheContext cacheContext() { + return cctx; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 192c8f1b17899..36fa60bc81d77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -22,12 +22,14 @@ import java.util.Collections; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; import java.util.Set; +import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -46,14 +48,18 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; @@ -63,9 +69,11 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.plugin.security.SecurityPermission; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX; @@ -261,7 +269,7 @@ public class CacheQuery { private IgniteClosure transform; /** Partition. */ - private Integer part; + private final Integer part; /** */ private final boolean incMeta; @@ -291,7 +299,10 @@ public class CacheQuery { private int taskHash; /** */ - private Boolean dataPageScanEnabled; + private final Boolean dataPageScanEnabled; + + /** */ + private final Collection skipKeys; /** * Cache query adapter for SCAN query. @@ -303,6 +314,7 @@ public class CacheQuery { * @param keepBinary Keep binary flag. * @param forceLocal Flag to force local query. * @param dataPageScanEnabled Flag to enable data page scan. + * @param skipKeys Set of keys that must be skiped during iteration. */ public CacheQuery( GridCacheContext cctx, @@ -312,9 +324,12 @@ public CacheQuery( @Nullable Integer part, boolean keepBinary, boolean forceLocal, - Boolean dataPageScanEnabled + Boolean dataPageScanEnabled, + @Nullable Set skipKeys ) { - this(cctx, type, null, null, filter, part, false, keepBinary, dataPageScanEnabled, null); + this(cctx, type, null, null, filter, part, false, keepBinary, dataPageScanEnabled, null, skipKeys); + + assert F.isEmpty(skipKeys) || type == SCAN; this.transform = transform; this.forceLocal = forceLocal; @@ -332,6 +347,7 @@ public CacheQuery( * @param incMeta Include metadata flag. * @param keepBinary Keep binary flag. * @param dataPageScanEnabled Flag to enable data page scan. + * @param skipKeys Set of keys that must be skiped during iteration. */ public CacheQuery( GridCacheContext cctx, @@ -343,7 +359,8 @@ public CacheQuery( boolean incMeta, boolean keepBinary, Boolean dataPageScanEnabled, - IndexQueryDesc idxQryDesc + IndexQueryDesc idxQryDesc, + @Nullable Collection skipKeys ) { assert cctx != null; assert type != null; @@ -359,6 +376,7 @@ public CacheQuery( this.keepBinary = keepBinary; this.dataPageScanEnabled = dataPageScanEnabled; this.idxQryDesc = idxQryDesc; + this.skipKeys = skipKeys; log = cctx.logger(getClass()); } @@ -383,6 +401,7 @@ public CacheQuery( * @param keepBinary Keep binary flag. * @param taskHash Task hash. * @param dataPageScanEnabled Flag to enable data page scan. + * @param skipKeys Set of keys that must be skiped during iteration. */ public CacheQuery( GridCacheContext cctx, @@ -402,7 +421,8 @@ public CacheQuery( boolean incMeta, boolean keepBinary, int taskHash, - Boolean dataPageScanEnabled + Boolean dataPageScanEnabled, + @Nullable Collection skipKeys ) { this.cctx = cctx; this.type = type; @@ -422,6 +442,7 @@ public CacheQuery( this.keepBinary = keepBinary; this.taskHash = taskHash; this.dataPageScanEnabled = dataPageScanEnabled; + this.skipKeys = skipKeys; } /** @@ -442,7 +463,7 @@ public CacheQuery( @Nullable String clsName, @Nullable IgniteBiPredicate filter ) { - this(cctx, type, clsName, null, filter, part, false, false, null, idxQryDesc); + this(cctx, type, clsName, null, filter, part, false, false, null, idxQryDesc, null); } /** @return Flag to enable data page scan. */ @@ -450,6 +471,11 @@ public Boolean isDataPageScanEnabled() { return dataPageScanEnabled; } + /** @return Set of keys that must be skiped during iteration. */ + public Collection skipKeys() { + return skipKeys; + } + /** @return Type. */ public GridCacheQueryType type() { return type; @@ -714,8 +740,11 @@ private CacheQueryFuture execute0(@Nullable IgniteReducer rmtReduce return (CacheQueryFuture)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } - /** @return Scan query iterator. */ - public GridCloseableIterator executeScanQuery() throws IgniteCheckedException { + /** + * @param newAndUpdatedEntries Collection of entries created or updated in transaction. + * @return Scan query iterator. + */ + public GridCloseableIterator executeScanQuery(List newAndUpdatedEntries) throws IgniteCheckedException { assert type == SCAN : "Wrong processing of query: " + type; GridDhtCacheAdapter cacheAdapter = cctx.isNear() ? cctx.near().dht() : cctx.dht(); @@ -756,14 +785,15 @@ public GridCloseableIterator executeScanQuery() throws IgniteCheckedException { final GridCacheQueryManager qryMgr = cctx.queries(); - boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); + final GridCloseableIterator iter = (nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId())) + ? qryMgr.scanQueryLocal(this, true) + : part != null + ? new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx) + : qryMgr.scanQueryDistributed(this, nodes); - if (loc) - return qryMgr.scanQueryLocal(this, true); - else if (part != null) - return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); - else - return qryMgr.scanQueryDistributed(this, nodes); + return F.isEmpty(newAndUpdatedEntries) + ? iter + : iteratorWithTxData(iter, newAndUpdatedEntries); } /** @return Nodes to execute on. */ @@ -842,6 +872,64 @@ private static Collection nodes(final GridCacheContext cctx, }); } + /** */ + private @NotNull GridCloseableIterator iteratorWithTxData( + final GridCloseableIterator iter, + List newAndUpdatedEntries + ) throws IgniteCheckedException { + IgniteClosure, T> t0 = (IgniteClosure, T>)transform; + + final GridIterator txIter = new AbstractScanQueryIterator<>((GridCacheContext)cctx, this, t0, true) { + private final Iterator txData = newAndUpdatedEntries.iterator(); + + /** {@inheritDoc} */ + @Override protected T advance() { + long start = System.nanoTime(); + + while (txData.hasNext()) { + final Object e = txData.next(); + final KeyCacheObject key; + final CacheObject val; + + if (e instanceof IgniteTxEntry) { + key = ((IgniteTxEntry)e).key(); + val = ((IgniteTxEntry)e).value(); + } + else { + key = ((IgniteBiTuple)e).get1(); + val = ((IgniteBiTuple)e).get2(); + } + + T next = filterAndTransform(key, val, start); + + if (next != null) + return next; + } + + return null; + } + }; + + return new GridCloseableIteratorAdapter<>() { + /** {@inheritDoc} */ + @Override protected T onNext() { + return iter.hasNext() ? iter.next() : txIter.next(); + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() { + return iter.hasNext() || txIter.hasNext(); + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + iter.close(); + + super.onClose(); + } + }; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheQuery.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 5e959eac1b4df..be16b2e73497e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -263,7 +263,8 @@ protected void removeQueryFuture(long reqId) { req.includeMetaData(), req.keepBinary(), req.taskHash(), - req.isDataPageScanEnabled() + req.isDataPageScanEnabled(), + req.skipKeys() ); return new GridCacheQueryInfo( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 0d386cd2c2cbe..cf9d04c9b54f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -772,7 +772,8 @@ private GridCloseableIterator> sharedCacheSetIterator(CacheQ qry.partition(), false, true, - qry.isDataPageScanEnabled()); + qry.isDataPageScanEnabled(), + null); return scanQueryLocal(qry0, false); } @@ -809,7 +810,7 @@ private GridCloseableIterator scanIterator(final CacheQuery qry, IgniteClosur final GridDhtLocalPartition locPart; - final GridIterator it; + GridIterator it; if (part != null) { final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); @@ -845,6 +846,15 @@ private GridCloseableIterator scanIterator(final CacheQuery qry, IgniteClosur qry.isDataPageScanEnabled()); } + final Set skipKeys = qry.skipKeys() == null ? Collections.emptySet() : new HashSet<>(qry.skipKeys()); + + if (!F.isEmpty(skipKeys)) { + // Intentionally use of `Set#remove` here. + // We want to perform as few `toKey` as possible. + // So we break some rules here to optimize work with the data provided by the underlying cursor. + it = F.iterator0(it, true, e -> skipKeys.isEmpty() || !skipKeys.remove(e.key())); + } + ScanQueryIterator iter = new ScanQueryIterator(it, qry, topVer, locPart, transformer, locNode, locNode ? locIters : null, cctx, log); @@ -2844,23 +2854,10 @@ public CacheQuery createSpiQuery(boolean keepBinary) { null, keepBinary, false, + null, null); } - /** - * Creates user's predicate based scan query. - * - * @param filter Scan filter. - * @param part Partition. - * @param keepBinary Keep binary flag. - * @param dataPageScanEnabled Flag to enable data page scan. - * @return Created query. - */ - public CacheQuery createScanQuery(@Nullable IgniteBiPredicate filter, - @Nullable Integer part, boolean keepBinary, Boolean dataPageScanEnabled) { - return createScanQuery(filter, null, part, keepBinary, false, dataPageScanEnabled); - } - /** * Creates user's predicate based scan query. * @@ -2870,6 +2867,7 @@ public CacheQuery createScanQuery(@Nullable IgniteBiPredicate filte * @param keepBinary Keep binary flag. * @param forceLocal Flag to force local scan. * @param dataPageScanEnabled Flag to enable data page scan. + * @param skipKeys Set of keys that must be skiped during iteration. * @return Created query. */ @SuppressWarnings("unchecked") @@ -2879,7 +2877,8 @@ public CacheQuery createScanQuery( @Nullable Integer part, boolean keepBinary, boolean forceLocal, - Boolean dataPageScanEnabled + Boolean dataPageScanEnabled, + Set skipKeys ) { return new CacheQuery(cctx, SCAN, @@ -2888,7 +2887,8 @@ public CacheQuery createScanQuery( part, keepBinary, forceLocal, - dataPageScanEnabled); + dataPageScanEnabled, + skipKeys); } /** @@ -2916,6 +2916,7 @@ public CacheQuery> createFullTextQuery(String clsName, false, keepBinary, null, + null, null) .limit(limit) .pageSize(pageSize); @@ -2999,5 +3000,4 @@ public boolean isCanceled(Long key) { return canceled != null && canceled.contains(key); } } - } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 3129a991e8146..abd10c640e5bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -19,13 +19,17 @@ import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -35,6 +39,7 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -147,6 +152,10 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac /** */ private AffinityTopologyVersion topVer; + /** Set of keys that must be skiped during iteration. */ + @GridDirectCollection(KeyCacheObject.class) + private Collection skipKeys; + /** */ private byte flags; @@ -194,7 +203,8 @@ public static GridCacheQueryRequest startQueryRequest(GridCacheContext cct cctx.affinity().affinityTopologyVersion(), // Force deployment anyway if scan query is used. cctx.deploymentEnabled() || deployFilterOrTransformer, - qry.isDataPageScanEnabled()); + qry.isDataPageScanEnabled(), + qry.skipKeys()); } /** @@ -328,6 +338,7 @@ private GridCacheQueryRequest( * @param taskHash Task name hash code. * @param topVer Topology version. * @param addDepInfo Deployment info flag. + * @param skipKeys Set of keys that must be skiped during iteration. */ private GridCacheQueryRequest( int cacheId, @@ -351,7 +362,8 @@ private GridCacheQueryRequest( int taskHash, AffinityTopologyVersion topVer, boolean addDepInfo, - Boolean dataPageScanEnabled + Boolean dataPageScanEnabled, + @Nullable Collection skipKeys ) { assert type != null || fields; assert clause != null || (type == SCAN || type == SET || type == SPI || type == INDEX); @@ -378,6 +390,7 @@ private GridCacheQueryRequest( this.taskHash = taskHash; this.topVer = topVer; this.addDepInfo = addDepInfo; + this.skipKeys = skipKeys; flags = setDataPageScanEnabled(flags, dataPageScanEnabled); } @@ -444,6 +457,11 @@ private static byte setDataPageScanEnabled(int flags, Boolean enabled) { idxQryDescBytes = CU.marshal(cctx, idxQryDesc); } + + if (!F.isEmpty(skipKeys)) { + for (KeyCacheObject k : skipKeys) + k.prepareMarshal(cctx.cacheObjectContext()); + } } /** {@inheritDoc} */ @@ -468,6 +486,13 @@ private static byte setDataPageScanEnabled(int flags, Boolean enabled) { if (idxQryDescBytes != null && idxQryDesc == null) idxQryDesc = U.unmarshal(mrsh, idxQryDescBytes, clsLdr); + + if (!F.isEmpty(skipKeys)) { + CacheObjectContext objCtx = ctx.cacheObjectContext(cacheId); + + for (KeyCacheObject k : skipKeys) + k.finishUnmarshal(objCtx, ldr); + } } /** {@inheritDoc} */ @@ -638,6 +663,11 @@ public Boolean isDataPageScanEnabled() { return null; } + /** @return Set of keys that must be skiped during iteration. */ + public Collection skipKeys() { + return skipKeys; + } + /** * @return partition. */ @@ -791,6 +821,12 @@ public Boolean isDataPageScanEnabled() { return false; writer.incrementState(); + + case 26: + if (!writer.writeCollection("skipKeys", skipKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); } return true; @@ -982,6 +1018,14 @@ public Boolean isDataPageScanEnabled() { case 25: idxQryDescBytes = reader.readByteArray("idxQryDescBytes"); + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 26: + skipKeys = reader.readCollection("skipKeys", MessageCollectionItemType.MSG); + if (!reader.isLastRead()) return false; @@ -998,7 +1042,7 @@ public Boolean isDataPageScanEnabled() { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 26; + return 27; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java index 24408ac9486f3..76187359bf150 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java @@ -17,19 +17,12 @@ package org.apache.ignite.internal.processors.cache.query; -import java.util.NoSuchElementException; -import java.util.UUID; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -39,24 +32,15 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; -import org.apache.ignite.internal.processors.security.SecurityUtils; -import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridIterator; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; -import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.injectResources; -import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId; - /** */ -public final class ScanQueryIterator extends GridCloseableIteratorAdapter { +public final class ScanQueryIterator extends AbstractScanQueryIterator { /** */ private static final long serialVersionUID = 0L; @@ -66,15 +50,6 @@ public final class ScanQueryIterator extends GridCloseableIteratorAdapt /** */ private final GridDhtLocalPartition locPart; - /** */ - private final IgniteBiPredicate filter; - - /** */ - private final Runnable closeFilterClo; - - /** */ - private final boolean statsEnabled; - /** */ private final GridIterator it; @@ -84,45 +59,12 @@ public final class ScanQueryIterator extends GridCloseableIteratorAdapt /** */ private final AffinityTopologyVersion topVer; - /** */ - private final boolean keepBinary; - - /** */ - private final boolean readEvt; - - /** */ - private final String cacheName; - - /** */ - private final UUID subjId; - - /** */ - private final String taskName; - - /** */ - private final IgniteClosure, R> transform; - - /** */ - private final CacheObjectContext objCtx; - - /** */ - private final GridCacheContext cctx; - /** */ private final IgniteLogger log; - /** */ - private R next; - - /** */ - private boolean needAdvance; - /** */ private IgniteCacheExpiryPolicy expiryPlc; - /** */ - private final boolean locNode; - /** */ private final boolean incBackups; @@ -156,69 +98,26 @@ public final class ScanQueryIterator extends GridCloseableIteratorAdapt @Nullable GridConcurrentHashSet> locIters, GridCacheContext cctx, IgniteLogger log) throws IgniteCheckedException { + super(cctx, qry, transformer, locNode); + assert !locNode || locIters != null : "Local iterators can't be null for local query."; this.it = it; this.topVer = topVer; this.locPart = locPart; - this.cctx = cctx; this.log = log; - this.locNode = locNode; this.locIters = locIters; incBackups = qry.includeBackups(); - statsEnabled = cctx.statisticsEnabled(); - - readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) && - cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ); - - taskName = readEvt ? cctx.kernalContext().task().resolveTaskName(qry.taskHash()) : null; - - subjId = securitySubjectId(cctx); - dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); cache = dht != null ? dht : cctx.cache(); - objCtx = cctx.cacheObjectContext(); - cacheName = cctx.name(); - needAdvance = true; expiryPlc = this.cctx.cache().expiryPolicy(null); startTime = U.currentTimeMillis(); pageSize = qry.pageSize(); - transform = SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteClosure.class, injectResources(transformer, cctx)); - closeFilterClo = qry.scanFilter() instanceof PlatformCacheEntryFilter - ? () -> closeFilter(qry.scanFilter()) - : null; - filter = prepareFilter(qry.scanFilter()); - // keep binary for remote scans if possible - keepBinary = (!locNode && filter == null && transformer == null && !readEvt) || qry.keepBinary(); - } - - /** {@inheritDoc} */ - @Override protected R onNext() { - if (needAdvance) - advance(); - else - needAdvance = true; - - if (next == null) - throw new NoSuchElementException(); - - return next; - } - - /** {@inheritDoc} */ - @Override protected boolean onHasNext() { - if (needAdvance) { - advance(); - - needAdvance = false; - } - - return next != null; } /** {@inheritDoc} */ @@ -232,17 +131,14 @@ public final class ScanQueryIterator extends GridCloseableIteratorAdapt if (locPart != null) locPart.release(); - if (closeFilterClo != null) - closeFilterClo.run(); + super.onClose(); if (locIters != null) locIters.remove(this); } - /** - * Moves the iterator to the next cache entry. - */ - private void advance() { + /** {@inheritDoc} */ + @Override protected R advance() { long start = statsEnabled ? System.nanoTime() : 0L; R next0 = null; @@ -317,74 +213,20 @@ private void advance() { } if (val != null) { - K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false); - V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false); - - if (statsEnabled) { - CacheMetricsImpl metrics = cctx.cache().metrics0(); - - metrics.onRead(true); - - metrics.addGetTimeNanos(System.nanoTime() - start); - } - - boolean passFilter; - - try { - passFilter = filter == null || filter.apply(key0, val0); - } - catch (Throwable e) { - throw new IgniteException(e); - } - - if (passFilter) { - if (readEvt) { - cctx.gridEvents().record(new CacheQueryReadEvent<>( - cctx.localNode(), - "Scan query entry read.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.SCAN.name(), - cacheName, - null, - null, - filter, - null, - null, - subjId, - taskName, - key0, - val0, - null, - null)); - } - - if (transform != null) { - try { - next0 = transform.apply(new CacheQueryEntry<>(key0, val0)); - } - catch (Throwable e) { - throw new IgniteException(e); - } - } - else - next0 = (R)(!locNode ? new T2<>(key0, val0) : - new CacheQueryEntry<>(key0, val0)); + next0 = filterAndTransform(key, val, start); + if (next0 != null) break; - } } } - if ((this.next = next0) == null && expiryPlc != null && dht != null) { + if (next0 == null && expiryPlc != null && dht != null) { dht.sendTtlUpdateRequest(expiryPlc); expiryPlc = null; } - } - /** */ - @Nullable public IgniteBiPredicate filter() { - return filter; + return next0; } /** */ @@ -397,69 +239,13 @@ public GridDhtLocalPartition localPartition() { return locPart; } - /** */ - public IgniteClosure, R> transformer() { - return transform; - } - /** */ public long startTime() { return startTime; } - /** */ - public boolean local() { - return locNode; - } - - /** */ - public boolean keepBinary() { - return keepBinary; - } - - /** */ - public UUID subjectId() { - return subjId; - } - - /** */ - public String taskName() { - return taskName; - } - - /** */ - public GridCacheContext cacheContext() { - return cctx; - } - /** */ public int pageSize() { return pageSize; } - - /** */ - private @Nullable IgniteBiPredicate prepareFilter(IgniteBiPredicate filter) throws IgniteCheckedException { - if (filter == null) - return null; - - try { - if (filter instanceof PlatformCacheEntryFilter) - ((PlatformCacheEntryFilter)filter).cacheContext(cctx); - else - injectResources(filter, cctx); - - return SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteBiPredicate.class, filter); - } - catch (IgniteCheckedException | RuntimeException e) { - closeFilter(filter); - - throw e; - } - } - - /** */ - public static void closeFilter(IgniteBiPredicate filter) { - if (filter instanceof PlatformCacheEntryFilter) - ((PlatformCacheEntryFilter)filter).onClose(); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionChanges.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionChanges.java index c26fe450a0735..a36dbfc532591 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionChanges.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionChanges.java @@ -45,6 +45,13 @@ public TransactionChanges(Set changedKeys, List newAndUpdated this.newAndUpdatedEntries = newAndUpdatedEntries; } + /** + * @return Changed keys set. + */ + public Set changedKeys() { + return changedKeys; + } + /** @return New and changed entries. */ public List newAndUpdatedEntries() { return newAndUpdatedEntries; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 23a9bdea14550..3d05aa47093d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -166,7 +166,7 @@ boolean checkHeader() throws IgniteCheckedException { CacheQuery qry = new CacheQuery<>(ctx, SET, new GridSetQueryPredicate<>(id, collocated), null, collocated ? hdrPart : null, - false, false, null); + false, false, null, null); Collection nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); @@ -441,7 +441,7 @@ private GridCloseableIterator iterator0(boolean keepBinary) { private WeakReferenceCloseableIterator sharedCacheIterator(boolean keepBinary) throws IgniteCheckedException { CacheQuery qry = new CacheQuery<>(ctx, SET, new GridSetQueryPredicate<>(id, collocated), null, collocated ? hdrPart : null, - keepBinary, false, null); + keepBinary, false, null, null); Collection nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AbstractTransactionalQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AbstractTransactionalQueryTest.java new file mode 100644 index 0000000000000..f06737b4b93c9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AbstractTransactionalQueryTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.RunnableX; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + +/** */ +@RunWith(Parameterized.class) +public abstract class AbstractTransactionalQueryTest extends GridCommonAbstractTest { + /** */ + @Parameterized.Parameter() + public TestTransactionMode txMode; + + /** @return Test parameters. */ + @Parameterized.Parameters(name = "txMode={0}") + public static Collection parameters() { + return Arrays.asList(TestTransactionMode.values()); + } + + /** */ + protected static TestTransactionMode currentMode; + + /** */ + protected static Transaction tx; + + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + if (currentMode != null && txMode == currentMode) + return; + + currentMode = txMode; + + clearTransaction(); + + stopAllGrids(); + + init(); + } + + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + clearTransaction(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + clearTransaction(); + + stopAllGrids(); + + currentMode = null; + + tx = null; + + super.afterTestsStopped(); + } + + /** */ + protected abstract void init() throws Exception; + + /** */ + protected CacheAtomicityMode atomicity() { + return txMode == TestTransactionMode.NONE ? ATOMIC : TRANSACTIONAL; + } + + /** */ + protected void put(Ignite node, IgniteCache cache, K key, V val) { + invokeAction(node, () -> cache.put(key, val)); + } + + /** */ + private void invokeAction(Ignite node, RunnableX action) { + if (tx == null && txMode != TestTransactionMode.NONE) + startTransaction(node); + + switch (txMode) { + case ALL: + txAction(node, action); + + break; + case NONE: + action.run(); + + break; + case RANDOM: + if (ThreadLocalRandom.current().nextBoolean()) + action.run(); + else + txAction(node, action); + + break; + default: + throw new IllegalArgumentException(); + } + } + + /** */ + public void txAction(Ignite node, RunnableX action) { + tx.resume(); + + try { + action.run(); + } + finally { + tx.suspend(); + } + } + + /** */ + protected void startTransaction(Ignite node) { + tx = node.transactions().txStart(PESSIMISTIC, READ_COMMITTED, getTestTimeout(), 100); + + tx.suspend(); + } + + /** */ + protected void clearTransaction() { + if (tx == null) + return; + + tx.resume(); + + tx.rollback(); + + tx = null; + } + + /** */ + public enum TestTransactionMode { + /** All put, remove and SQL dml will be executed inside transaction. */ + ALL, + + /** Only some DML operations will be executed inside transaction. */ + RANDOM, + + /** Don't use transaction for DML. */ + NONE + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java index e6d528623f8af..f3d8f069a4993 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java @@ -26,13 +26,12 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; /** * */ -public class IgniteCacheBinaryObjectsScanSelfTest extends GridCommonAbstractTest { +public class IgniteCacheBinaryObjectsScanSelfTest extends AbstractTransactionalQueryTest { /** */ private static final String PERSON_CLS_NAME = "org.apache.ignite.tests.p2p.cache.Person"; @@ -43,7 +42,7 @@ public class IgniteCacheBinaryObjectsScanSelfTest extends GridCommonAbstractTest private static ClassLoader ldr; /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { + @Override protected void init() throws Exception { ldr = getExternalClassLoader(); startGrids(3); @@ -56,6 +55,8 @@ public class IgniteCacheBinaryObjectsScanSelfTest extends GridCommonAbstractTest /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { ldr = null; + + super.afterTestsStopped(); } /** {@inheritDoc} */ @@ -90,7 +91,8 @@ private void populateCache(ClassLoader ldr) throws Exception { Ignite client = grid("client"); - CacheConfiguration cfg = new CacheConfiguration<>("testCache"); + CacheConfiguration cfg = new CacheConfiguration<>("testCache") + .setAtomicityMode(atomicity()); IgniteCache cache = client.getOrCreateCache(cfg); @@ -99,7 +101,7 @@ private void populateCache(ClassLoader ldr) throws Exception { GridTestUtils.setFieldValue(key, "id", i); - cache.put(key, cls.newInstance()); + put(client, cache, key, cls.newInstance()); } } @@ -110,6 +112,14 @@ private void populateCache(ClassLoader ldr) throws Exception { public void testScanNoClasses() throws Exception { Ignite client = grid("client"); + if (txMode == TestTransactionMode.NONE) + doTestScanNoClasses(client); + else + txAction(client, () -> doTestScanNoClasses(client)); + } + + /** */ + private static void doTestScanNoClasses(Ignite client) { IgniteCache cache = client.cache("testCache"); List> entries = cache.query(new ScanQuery<>()).getAll(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java index ad2dc70489258..aa5b744f7d7e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheScanPredicateDeploymentSelfTest.java @@ -23,8 +23,8 @@ import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -35,7 +35,7 @@ /** * */ -public class IgniteCacheScanPredicateDeploymentSelfTest extends GridCommonAbstractTest { +public class IgniteCacheScanPredicateDeploymentSelfTest extends AbstractTransactionalQueryTest { /** Test value. */ protected static final String TEST_PREDICATE = "org.apache.ignite.tests.p2p.CacheDeploymentAlwaysTruePredicate"; @@ -53,6 +53,11 @@ public class IgniteCacheScanPredicateDeploymentSelfTest extends GridCommonAbstra return cfg; } + /** {@inheritDoc} */ + @Override protected void init() throws Exception { + // No-op + } + /** * @return Cache configuration. * @throws Exception In case of error. @@ -88,17 +93,26 @@ public void testDeployScanPredicate() throws Exception { // It is important that there are no too many keys. for (int i = 0; i < 1; i++) - cache.put(i, i); + put(grid(3), cache, i, i); Class predCls = grid(3).configuration().getClassLoader().loadClass(TEST_PREDICATE); IgniteBiPredicate pred = (IgniteBiPredicate)predCls.newInstance(); - List> all = cache.query(new ScanQuery<>(pred)).getAll(); + RunnableX check = () -> { + List> all = cache.query(new ScanQuery<>(pred)).getAll(); - assertEquals(1, all.size()); + assertEquals(1, all.size()); + }; + + if (txMode == TestTransactionMode.NONE) + check.run(); + else + txAction(grid(3), check); } finally { + clearTransaction(); + stopAllGrids(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java index 76420397ed685..505a99e74a03f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java @@ -22,6 +22,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.IntFunction; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -33,20 +37,25 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.AbstractTransactionalQueryTest; +import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static org.junit.Assume.assumeTrue; + /** * Test for scan query with transformer. */ -public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest { +public class GridCacheQueryTransformerSelfTest extends AbstractTransactionalQueryTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -57,22 +66,40 @@ public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { + @Override protected void init() throws Exception { + stopAllGrids(); startGridsMultiThreaded(3); startClientGrid(); } + /** */ + public void doTestWithCache(IntFunction val, Consumer> test) { + doTestWithCache(val, 50, test); + } + + /** */ + public void doTestWithCache(IntFunction val, int numEntries, Consumer> test) { + IgniteCache cache = createTestCache(val, numEntries); + + try { + if (txMode == TestTransactionMode.NONE) + test.accept(cache); + else + txAction(grid(), () -> test.accept(cache)); + } + finally { + clearTransaction(); + + cache.destroy(); + } + } + /** * @throws Exception If failed. */ @Test public void testGetKeys() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, "val" + i); - + doTestWithCache(i -> "val" + i, cache -> { IgniteClosure, Integer> transformer = new IgniteClosure, Integer>() { @Override public Integer apply(Cache.Entry e) { @@ -88,10 +115,7 @@ public void testGetKeys() throws Exception { for (int i = 0; i < 50; i++) assertEquals(i, keys.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -99,12 +123,7 @@ public void testGetKeys() throws Exception { */ @Test public void testGetKeysFiltered() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, "val" + i); - + doTestWithCache(i -> "val" + i, cache -> { IgniteBiPredicate filter = new IgniteBiPredicate() { @Override public boolean apply(Integer k, String v) { return k % 10 == 0; @@ -126,10 +145,7 @@ public void testGetKeysFiltered() throws Exception { for (int i = 0; i < 5; i++) assertEquals(i * 10, keys.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -137,12 +153,7 @@ public void testGetKeysFiltered() throws Exception { */ @Test public void testGetObjectField() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); - + doTestWithCache(i -> new Value("str" + i, i * 100), cache -> { IgniteClosure, Integer> transformer = new IgniteClosure, Integer>() { @Override public Integer apply(Cache.Entry e) { @@ -158,38 +169,26 @@ public void testGetObjectField() throws Exception { for (int i = 0; i < 50; i++) assertEquals(i * 100, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** * @throws Exception If failed. */ @Test - public void testGetObjectFieldPartitioned() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - Affinity aff = affinity(cache); + public void testGetObjectFieldPartitioned() { + IgniteCache cache = createTestCache(); - try { - int[] keys = new int[50]; + Consumer putToCache = i -> put(grid(), cache, i, new Value("str" + i, i * 100)); - for (int i = 0, j = 0; i < keys.length; j++) { - if (aff.partition(j) == 0) - keys[i++] = j; - } + List keys = partitionKeys(cache, 0, 50, 0); - for (int i : keys) - cache.put(i, new Value("str" + i, i * 100)); + keys.forEach(putToCache); + partitionKeys(cache, 1, 5, 0).forEach(putToCache); + partitionKeys(cache, 2, 5, 0).forEach(putToCache); - IgniteClosure, Integer> transformer = - new IgniteClosure, Integer>() { - @Override public Integer apply(Cache.Entry e) { - return e.getValue().idx; - } - }; + RunnableX check = () -> { + IgniteClosure, Integer> transformer = e -> e.getValue().idx; List res = cache.query(new ScanQuery().setPartition(0), transformer).getAll(); @@ -197,10 +196,19 @@ public void testGetObjectFieldPartitioned() throws Exception { Collections.sort(res); - for (int i = 0; i < keys.length; i++) - assertEquals(keys[i] * 100, res.get(i).intValue()); + for (int i = 0; i < keys.size(); i++) + assertEquals(keys.get(i) * 100, res.get(i).intValue()); + }; + + try { + if (txMode == TestTransactionMode.NONE) + check.run(); + else + txAction(grid(), check); } finally { + clearTransaction(); + cache.destroy(); } } @@ -210,12 +218,7 @@ public void testGetObjectFieldPartitioned() throws Exception { */ @Test public void testGetObjectFieldFiltered() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); - + doTestWithCache(i -> new Value("str" + i, i * 100), cache -> { IgniteBiPredicate filter = new IgniteBiPredicate() { @Override public boolean apply(Integer k, Value v) { return v.idx % 1000 == 0; @@ -237,10 +240,7 @@ public void testGetObjectFieldFiltered() throws Exception { for (int i = 0; i < 5; i++) assertEquals(i * 1000, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -248,12 +248,7 @@ public void testGetObjectFieldFiltered() throws Exception { */ @Test public void testKeepBinary() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); - + doTestWithCache(i -> new Value("str" + i, i * 100), cache -> { IgniteCache binaryCache = cache.withKeepBinary(); IgniteClosure, Integer> transformer = @@ -271,10 +266,7 @@ public void testKeepBinary() throws Exception { for (int i = 0; i < 50; i++) assertEquals(i * 100, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -282,12 +274,7 @@ public void testKeepBinary() throws Exception { */ @Test public void testKeepBinaryFiltered() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); - + doTestWithCache(i -> new Value("str" + i, i * 100), cache -> { IgniteCache binaryCache = cache.withKeepBinary(); IgniteBiPredicate filter = new IgniteBiPredicate() { @@ -311,8 +298,54 @@ public void testKeepBinaryFiltered() throws Exception { for (int i = 0; i < 5; i++) assertEquals(i * 1000, res.get(i).intValue()); + }); + } + + /** @throws Exception If failed. */ + @Test + public void testLocal() { + IgniteCache cache = createTestCache(); + + Affinity aff = affinity(cache); + + BiConsumer putToCache = (grid, i) -> put(grid, cache, i, new Value("str" + i, i * 100)); + + AtomicInteger k = new AtomicInteger(); + + List keys0 = new ArrayList<>(50); + List keys1 = new ArrayList<>(50); + + for (int i = 0; i < 50; i++) { + keys0.add(keyForNode(aff, k, grid(0).localNode())); + keys1.add(keyForNode(aff, k, grid(1).localNode())); + } + + keys0.forEach(i -> putToCache.accept(grid(0), i)); + keys1.forEach(i -> putToCache.accept(grid(1), i)); + + BiConsumer> check = (grid, keys) -> { + List res = grid.cache("test-cache").query( + new ScanQuery().setLocal(true), + Cache.Entry::getKey + ).getAll(); + + assertEquals(50, res.size()); + assertTrue(res.containsAll(keys)); + }; + + try { + if (txMode == TestTransactionMode.NONE) { + check.accept(grid(0), keys0); + check.accept(grid(1), keys1); + } + else { + txAction(grid(0), () -> check.accept(grid(0), keys0)); + txAction(grid(1), () -> check.accept(grid(1), keys1)); + } } finally { + clearTransaction(); + cache.destroy(); } } @@ -321,13 +354,10 @@ public void testKeepBinaryFiltered() throws Exception { * @throws Exception If failed. */ @Test - public void testLocal() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); + public void testLocalCompute() throws Exception { + assumeTrue(txMode == TestTransactionMode.NONE); + doTestWithCache(i -> new Value("str" + i, i * 100), cache -> { Collection> lists = grid().compute().broadcast(new IgniteCallable>() { @IgniteInstanceResource private Ignite ignite; @@ -353,10 +383,7 @@ public void testLocal() throws Exception { for (int i = 0; i < 50; i++) assertEquals(i * 100, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -364,12 +391,9 @@ public void testLocal() throws Exception { */ @Test public void testLocalFiltered() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); + assumeTrue(txMode == TestTransactionMode.NONE); + doTestWithCache(i -> new Value("str" + i, i * 100), cache -> { Collection> lists = grid().compute().broadcast(new IgniteCallable>() { @IgniteInstanceResource private Ignite ignite; @@ -401,10 +425,7 @@ public void testLocalFiltered() throws Exception { for (int i = 0; i < 5; i++) assertEquals(i * 1000, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -412,12 +433,9 @@ public void testLocalFiltered() throws Exception { */ @Test public void testLocalKeepBinary() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); + assumeTrue(txMode == TestTransactionMode.NONE); + doTestWithCache(i -> new Value("str" + i, i * 100), cache -> { Collection> lists = grid().compute().broadcast(new IgniteCallable>() { @IgniteInstanceResource private Ignite ignite; @@ -443,10 +461,7 @@ public void testLocalKeepBinary() throws Exception { for (int i = 0; i < 50; i++) assertEquals(i * 100, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -454,12 +469,11 @@ public void testLocalKeepBinary() throws Exception { */ @Test public void testLocalKeepBinaryFiltered() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); + assumeTrue(txMode == TestTransactionMode.NONE); - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); + IgniteCache cache = createTestCache(i -> new Value("str" + i, i * 100)); + try { Collection> lists = grid().compute().broadcast(new IgniteCallable>() { @IgniteInstanceResource private Ignite ignite; @@ -502,7 +516,7 @@ public void testLocalKeepBinaryFiltered() throws Exception { */ @Test public void testUnsupported() throws Exception { - final IgniteCache cache = grid().createCache("test-cache"); + final IgniteCache cache = createTestCache(); final IgniteClosure, Integer> transformer = new IgniteClosure, Integer>() { @@ -591,15 +605,10 @@ public void testUnsupported() throws Exception { */ @Test public void testPageSize() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); - int numEntries = 10_000; int pageSize = 3; - try { - for (int i = 0; i < numEntries; i++) - cache.put(i, new Value("str" + i, i)); - + doTestWithCache(i -> new Value("str" + i, i), numEntries, cache -> { IgniteClosure, Integer> transformer = new IgniteClosure, Integer>() { @Override public Integer apply(Cache.Entry e) { @@ -618,10 +627,7 @@ public void testPageSize() throws Exception { for (int i = 0; i < numEntries; i++) assertEquals(i, res.get(i).intValue()); - } - finally { - cache.destroy(); - } + }); } /** @@ -629,12 +635,11 @@ public void testPageSize() throws Exception { */ @Test public void testLocalInjection() throws Exception { - IgniteCache cache = grid().createCache("test-cache"); + assumeTrue(txMode == TestTransactionMode.NONE); - try { - for (int i = 0; i < 50; i++) - cache.put(i, new Value("str" + i, i * 100)); + IgniteCache cache = createTestCache(i -> new Value("str" + i, i * 100)); + try { Collection> lists = grid().compute().broadcast(new IgniteCallable>() { @IgniteInstanceResource private Ignite ignite; @@ -667,6 +672,28 @@ public void testLocalInjection() throws Exception { } } + /** */ + private IgniteCache createTestCache(IntFunction val) { + return createTestCache(val, 50); + } + + /** */ + private IgniteCache createTestCache(IntFunction val, int numEntries) { + IgniteCache cache = createTestCache(); + + for (int i = 0; i < numEntries; i++) + put(grid(), cache, i, val.apply(i)); + + return cache; + + } + + /** */ + private IgniteCache createTestCache() { + return grid().createCache(new CacheConfiguration("test-cache") + .setAtomicityMode(atomicity())); + } + /** */ private static class Value { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java new file mode 100644 index 0000000000000..60dba0260a1dc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionIsolationTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.Cache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.SERVER; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_JDBC; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_CACHE_API; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ExecutorType.THIN_VIA_QUERY; +import static org.apache.ignite.internal.processors.cache.query.AbstractQueryTransactionIsolationTest.ModifyApi.QUERY; + +/** */ +public class ScanQueryTransactionIsolationTest extends AbstractQueryTransactionIsolationTest { + /** @return Test parameters. */ + @Parameterized.Parameters( + name = "gridCnt={0},backups={1},partitionAwareness={2},mode={3},execType={4},modify={5},commit={6},multi={7},txConcurrency={8}") + public static Collection parameters() { + List params = new ArrayList<>(); + + for (int gridCnt : new int[]{1, 3}) { + int[] backups = gridCnt > 1 + ? new int[]{1, gridCnt - 1} + : new int[]{0}; + + for (int backup : backups) { + for (CacheMode mode : CacheMode.values()) { + for (ModifyApi modify : new ModifyApi[]{ModifyApi.CACHE, ModifyApi.ENTRY_PROCESSOR}) { + for (boolean commit : new boolean[]{false, true}) { + for (boolean mutli : new boolean[]{false, true}) { + for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) { + for (ExecutorType execType : new ExecutorType[]{SERVER, ExecutorType.CLIENT}) { + params.add(new Object[]{ + gridCnt, + backup, + false, //partition awareness + mode, + execType, + modify, + commit, + mutli, + txConcurrency + }); + } + + for (boolean partitionAwareness : new boolean[]{false, true}) { + params.add(new Object[]{ + gridCnt, + backup, + partitionAwareness, + mode, + THIN_VIA_QUERY, // executor type + modify, + commit, + mutli, + txConcurrency + }); + } + } + } + } + } + } + } + } + + return params; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected User select(Integer id, ModifyApi api) { + assertTrue(type != THIN_VIA_CACHE_API); + assertTrue(type != THIN_JDBC); + + if (api == QUERY) { + ScanQuery qry = new ScanQuery() + .setFilter((id0, user) -> Objects.equals(id0, id)); + + boolean withTrasformer = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + boolean useGetAll = ThreadLocalRandom.current().nextBoolean(); + boolean useCacheIter = (type == SERVER || type == CLIENT) && ThreadLocalRandom.current().nextBoolean(); + + if (!withTrasformer) { + if (useCacheIter) { + assertTrue(type == SERVER || type == CLIENT); + + List> res = + toList(F.iterator0(node().cache(users()), true, e -> Objects.equals(e.getKey(), id))); + + assertTrue(F.size(res) + "", F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + else { + QueryCursor> cursor = null; + + if (type == THIN_VIA_QUERY) + cursor = thinCli.cache(users()).query(qry); + else if (type == SERVER || type == CLIENT) + cursor = node().cache(users()).query(qry); + else + fail("Unsupported executor type: " + type); + + List> res = toList(cursor, useGetAll); + + assertTrue("useGetAll=" + useGetAll + ", useCacheIter=" + useCacheIter, F.size(res) <= 1); + + return F.isEmpty(res) ? null : res.get(0).getValue(); + } + } + else { + assertTrue(type == SERVER || type == CLIENT); + + List res = toList(node().cache(users()).query(qry, Cache.Entry::getValue), useGetAll); + + assertTrue("withTransformer=" + withTrasformer + ", useGetAll=" + useGetAll, F.size(res) <= 1); + + return F.first(res); + } + } + + return super.select(id, api); + } + + /** */ + private static List toList(QueryCursor cursor, boolean useGetAll) { + return useGetAll ? cursor.getAll() : toList(cursor.iterator()); + } + + /** */ + private static List toList(Iterator iter) { + List res = new ArrayList<>(); + + iter.forEachRemaining(res::add); + + return res; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionsUnsupportedModesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionsUnsupportedModesTest.java new file mode 100644 index 0000000000000..190cd00484fcf --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryTransactionsUnsupportedModesTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.HashSet; +import java.util.Set; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.junit.Test; + +/** */ +public class ScanQueryTransactionsUnsupportedModesTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true); + + return cfg; + } + + /** */ + @Test + public void testUnsupportedTransactionModes() throws Exception { + try (IgniteEx srv = startGrid(0)) { + for (boolean client : new boolean[] {false, true}) { + IgniteEx executor = client ? startClientGrid() : srv; + + IgniteCache c = executor.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + Set supported = new HashSet<>(TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES); + + for (TransactionIsolation isolation : TransactionIsolation.values()) { + try (Transaction ignored = executor.transactions().txStart(concurrency, isolation)) { + c.query(new ScanQuery<>()).getAll(); + + assertTrue(supported.remove(isolation)); + } + catch (CacheException e) { + assertFalse(supported.contains(isolation)); + } + } + + assertTrue(supported.isEmpty()); + } + } + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java index e0013831966cf..4d598517c695a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryReservationOnUnstableTopologyTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxMultiNodeBasicTest; +import org.apache.ignite.internal.processors.cache.query.ScanQueryTransactionIsolationTest; +import org.apache.ignite.internal.processors.cache.query.ScanQueryTransactionsUnsupportedModesTest; import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest; import org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest; import org.apache.ignite.internal.processors.query.LocalQueryLazyTest; @@ -96,6 +98,9 @@ IgniteCacheQueryReservationOnUnstableTopologyTest.class, SqlAffinityCacheTest.class, + + ScanQueryTransactionsUnsupportedModesTest.class, + ScanQueryTransactionIsolationTest.class, }) public class IgniteBinaryCacheQueryTestSuite4 { /** Setup lazy mode default. */