Skip to content

Commit

Permalink
IGNITE-22733 Support transactions in Scan query (apache#11580)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored and luchnikovbsk committed Jan 31, 2025
1 parent de55817 commit 37be13d
Show file tree
Hide file tree
Showing 20 changed files with 1,177 additions and 412 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
Expand Down Expand Up @@ -346,6 +347,7 @@ public static Collection<QueryTxEntry> transactionChanges(
* @param cmp Comparator to sort new and updated entries.
* @return First, set of object changed in transaction, second, list of transaction data in required format.
* @param <R> Required type.
* @see GridCacheContext#transactionChanges(Integer)
*/
public <R> TransactionChanges<R> transactionChanges(
int cacheId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,12 @@ public IgniteClientFuture<Void> removeAllConflictAsync(Map<? extends K, GridCach
/** Handle scan query. */
private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
writeCacheInfo(
payloadCh,
payloadCh.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.TX_AWARE_QUERIES)
? transactions.tx()
: null
);

BinaryOutputStream out = payloadCh.out();

Expand All @@ -1155,7 +1160,7 @@ private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {

return new ClientQueryCursor<>(new ClientQueryPager<>(
ch,
null,
transactions.tx(),
ClientOperation.QUERY_SCAN,
ClientOperation.QUERY_SCAN_CURSOR_GET_PAGE,
qryWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
Expand Down Expand Up @@ -3536,8 +3537,17 @@ private Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary,

final CacheOperationContext opCtx = ctx.operationContextPerCall();

final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(p, null, keepBinary, null)
.executeScanQuery();
final TransactionChanges<Object> txChanges = ctx.transactionChanges(null);

final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(
p,
null,
null,
keepBinary,
false,
null,
txChanges.changedKeys()
).executeScanQuery(txChanges.newAndUpdatedEntries());

return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
@Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -36,6 +37,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.expiry.EternalExpiryPolicy;
Expand Down Expand Up @@ -83,9 +85,11 @@
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
Expand Down Expand Up @@ -2350,6 +2354,53 @@ public void dumpListener(DumpEntryChangeListener dumpEntryChangeLsnr) {
this.dumpLsnr = dumpEntryChangeLsnr;
}

/**
* @param part Partition.
* @return First, set of object changed in transaction, second, list of transaction data in required format.
* @see ExecutionContext#transactionChanges(int, int[], Function)
*/
public TransactionChanges<Object> transactionChanges(Integer part) {
if (!U.isTxAwareQueriesEnabled(ctx))
return TransactionChanges.empty();

IgniteInternalTx tx = tm().tx();

if (tx == null)
return TransactionChanges.empty();

IgniteTxManager.ensureTransactionModeSupported(tx.isolation());

Set<KeyCacheObject> changedKeys = new HashSet<>();
List<Object> newAndUpdatedRows = new ArrayList<>();

for (IgniteTxEntry e : tx.writeEntries()) {
if (e.cacheId() != cacheId)
continue;

int epart = e.key().partition();

assert epart != -1;

if (part != null && epart != part)
continue;

changedKeys.add(e.key());

CacheObject val = e.value();

boolean hasEntryProcessors = !F.isEmpty(e.entryProcessors());

if (hasEntryProcessors)
val = e.applyEntryProcessors(val);

// Mix only updated or inserted entries. In case val == null entry removed.
if (val != null)
newAndUpdatedRows.add(hasEntryProcessors ? F.t(e.key(), val) : e);
}

return new TransactionChanges<>(changedKeys, newAndUpdatedRows);
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, igniteInstanceName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
Expand Down Expand Up @@ -464,9 +465,9 @@ public IgniteCacheProxy<K, V> gatewayWrapper() {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private <T, R> QueryCursor<R> query(
final ScanQuery scanQry,
@Nullable final IgniteClosure<T, R> transformer,
private <R> QueryCursor<R> query(
final ScanQuery<K, V> scanQry,
@Nullable final IgniteClosure<Cache.Entry<K, V>, R> transformer,
@Nullable ClusterGroup grp
) throws IgniteCheckedException {
GridCacheContext<K, V> ctx = getContextSafe();
Expand All @@ -477,8 +478,10 @@ private <T, R> QueryCursor<R> query(

IgniteBiPredicate<K, V> p = scanQry.getFilter();

TransactionChanges<Object> txChanges = ctx.transactionChanges(scanQry.getPartition());

final CacheQuery<R> qry = ctx.queries().createScanQuery(
p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal(), null);
p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal(), null, txChanges.changedKeys());

if (scanQry.getPageSize() > 0)
qry.pageSize(scanQry.getPageSize());
Expand All @@ -489,7 +492,7 @@ private <T, R> QueryCursor<R> query(
final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN,
cacheName, ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() {
@Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
return qry.executeScanQuery();
return qry.executeScanQuery(txChanges.newAndUpdatedEntries());
}
}, true);

Expand Down Expand Up @@ -830,7 +833,7 @@ private QueryCursor<Cache.Entry<K, V>> queryContinuous(AbstractContinuousQuery q
null, keepBinary, true).get(0);

if (qry instanceof ScanQuery)
return query((ScanQuery)qry, null, projection(qry.isLocal()));
return query((ScanQuery<K, V>)qry, null, projection(qry.isLocal()));

return (QueryCursor<R>)query(qry, projection(qry.isLocal()));
}
Expand Down Expand Up @@ -860,7 +863,7 @@ private QueryCursor<Cache.Entry<K, V>> queryContinuous(AbstractContinuousQuery q

validate(qry);

return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal()));
return query((ScanQuery<K, V>)qry, (IgniteClosure<Cache.Entry<K, V>, R>)transformer, projection(qry.isLocal()));
}
catch (Exception e) {
if (e instanceof CacheException)
Expand Down
Loading

0 comments on commit 37be13d

Please sign in to comment.