Skip to content

Commit

Permalink
IGNITE-19872 IgniteTxLocalAdapter initial cleanup (#10812)
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov authored Jun 30, 2023
1 parent 961fdd0 commit a0320dc
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4462,8 +4462,14 @@ public <K> IgniteInternalFuture<GridCacheReturn> lockAllAsync(GridCacheContext c
);
}

/** {@inheritDoc} */
@Override protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) {
/**
* Gets cache entry for given key.
*
* @param cacheCtx Cache context.
* @param key Key.
* @return Cache entry.
*/
protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) {
if (cacheCtx.isColocated()) {
IgniteTxEntry txEntry = entry(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
Expand Down Expand Up @@ -122,7 +121,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
protected GridCacheVersion minVer;

/** Flag indicating with TM commit happened. */
protected volatile int doneFlag;
private volatile int doneFlag;

/** Committed versions, relative to base. */
private Collection<GridCacheVersion> committedVers = Collections.emptyList();
Expand All @@ -134,7 +133,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
private GridCacheVersion completedBase;

/** Commit error. */
protected volatile Throwable commitErr;
private volatile Throwable commitErr;

/** Implicit transaction result. */
protected GridCacheReturn implicitRes;
Expand All @@ -144,10 +143,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig

/** */
@GridToStringInclude
protected IgniteTxLocalState txState;
protected final IgniteTxLocalState txState;

/** */
protected CacheWriteSynchronizationMode syncMode;
private CacheWriteSynchronizationMode syncMode;

/** */
protected volatile boolean qryEnlisted;
Expand Down Expand Up @@ -370,8 +369,7 @@ protected boolean commitAfterLock() {
@Nullable @Override public GridTuple<CacheObject> peek(
GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key
) throws GridCacheFilterFailedException {
KeyCacheObject key) {
IgniteTxEntry e = entry(cacheCtx.txKey(key));

if (e != null)
Expand Down Expand Up @@ -483,7 +481,7 @@ public void calculatePartitionUpdateCounters() throws IgniteTxRollbackCheckedExc
"[tx=" + CU.txString(this) +
", readyTopVer=" + top.readyTopologyVersion() +
", lostParts=" + top.lostPartitions() +
", part=" + part.toString() + ']');
", part=" + part + ']');

throw new IgniteTxRollbackCheckedException("Failed to prepare a transaction on outdated " +
"topology, please try again [timeout=" + timeout() + ", tx=" + CU.txString(this) + ']');
Expand Down Expand Up @@ -518,17 +516,6 @@ assert ownsLockUnsafe(entry) : "Lock is not owned for commit [entry=" + entry +
", tx=" + this + ']';
}

/**
* Gets cache entry for given key.
*
* @param cacheCtx Cache context.
* @param key Key.
* @return Cache entry.
*/
protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) {
return cacheCtx.cache().entryEx(key.key());
}

/**
* Gets cache entry for given key and topology version.
*
Expand Down Expand Up @@ -572,7 +559,7 @@ protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key, A

WALPointer ptr = null;

IgniteCheckedException err = null;
IgniteCheckedException err;

cctx.database().checkpointReadLock();

Expand Down Expand Up @@ -611,10 +598,7 @@ protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key, A

boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer);

boolean metrics = true;

if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped())
metrics = false;
boolean metrics = updateNearCache || !cacheCtx.isNear() || !txEntry.locallyMapped();

boolean evt = !isNearLocallyMapped(txEntry, false);

Expand Down Expand Up @@ -707,8 +691,6 @@ else if (conflictCtx.isUseNew()) {
explicitVer = null;

if (conflictNeedResolve) {
assert cacheCtx.isReplicated() || conflictNeedResolve;

txEntry.value(val, true, false);
txEntry.op(op);
txEntry.entryProcessors(null);
Expand Down Expand Up @@ -831,7 +813,7 @@ else if (op == RELOAD) {
cached.innerReload();

if (updateNearCache)
updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload());
updateNearEntrySafely(cacheCtx, txEntry.key(), GridCacheEntryEx::innerReload);
}
else if (op == READ) {
CacheGroupContext grp = cacheCtx.group();
Expand Down Expand Up @@ -1564,13 +1546,6 @@ else if (explicitCand.dhtLocal())
}
}

/**
* @return Map of affected partitions: cacheId -> partId.
*/
public Map<Integer, Set<Integer>> partsMap() {
return null;
}

/** {@inheritDoc} */
@Override public void touchPartition(int cacheId, int partId) {
txState.touchPartition(cacheId, partId);
Expand Down Expand Up @@ -1601,9 +1576,8 @@ void entryExpiry(IgniteTxKey key, @Nullable ExpiryPolicy expiryPlc) {
* @param key Key.
* @param ttl TTL.
* @param expireTime Expire time.
* @return {@code true} if tx entry exists for this key, {@code false} otherwise.
*/
boolean entryTtlDr(IgniteTxKey key, long ttl, long expireTime) {
void entryTtlDr(IgniteTxKey key, long ttl, long expireTime) {
assert key != null;
assert ttl >= 0;

Expand All @@ -1616,8 +1590,6 @@ boolean entryTtlDr(IgniteTxKey key, long ttl, long expireTime) {

e.expiry(null);
}

return e != null;
}

/**
Expand Down Expand Up @@ -1715,18 +1687,6 @@ protected abstract class PLC2<T> extends PostLockClosure2<T> {
// No-op.
}

/**
* Post-lock closure alias.
*
* @param <T> Return type.
*/
protected abstract class PMC<T> extends PostMissClosure<T> {
/** */
private static final long serialVersionUID = 0L;

// No-op.
}

/**
* Post-lock closure.
*
Expand All @@ -1737,10 +1697,10 @@ protected abstract class PostLockClosure1<T> implements IgniteBiClosure<Boolean,
private static final long serialVersionUID = 0L;

/** Closure argument. */
private T arg;
private final T arg;

/** Commit flag. */
private boolean commit;
private final boolean commit;

/**
* Creates a Post-Lock closure that will pass the argument given to the {@code postLock} method.
Expand Down Expand Up @@ -1892,48 +1852,6 @@ protected abstract class PostLockClosure2<T> implements IgniteBiClosure<Boolean,
protected abstract IgniteInternalFuture<T> postLock() throws IgniteCheckedException;
}

/**
* Post-lock closure.
*
* @param <T> Return type.
*/
protected abstract class PostMissClosure<T> implements IgniteBiClosure<T, Exception, IgniteInternalFuture<T>> {
/** */
private static final long serialVersionUID = 0L;

/** {@inheritDoc} */
@Override public final IgniteInternalFuture<T> apply(T t, Exception e) {
boolean rollback = true;

try {
if (e != null)
throw new GridClosureException(e);

IgniteInternalFuture<T> fut = postMiss(t);

rollback = false;

return fut;
}
catch (IgniteCheckedException ex) {
throw new GridClosureException(ex);
}
finally {
if (rollback)
setRollbackOnly();
}
}

/**
* Post lock callback.
*
* @param t Post-miss parameter.
* @return Future return value.
* @throws IgniteCheckedException If operation failed.
*/
protected abstract IgniteInternalFuture<T> postMiss(T t) throws IgniteCheckedException;
}

/**
* Clojure to perform operations with near cache entries.
*/
Expand Down

0 comments on commit a0320dc

Please sign in to comment.