Skip to content

Commit

Permalink
IGNITE-14823 Future abbrevation (#11116)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Dec 21, 2023
1 parent b583b7d commit 2689972
Show file tree
Hide file tree
Showing 33 changed files with 169 additions and 169 deletions.
4 changes: 2 additions & 2 deletions modules/checkstyle/src/main/resources/abbrevations.csv
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ event,evt
events,evts
exception,e,e2,ex
execute,exec
#frequency,freq
#future,fut
frequency,freq
future,fut
#group,grp
#handler,hnd
#header,hdr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,7 @@ public IgniteInternalFuture<?> initCoordinatorCaches(
assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
", total=" + exchFuts.size() + ']';

GridDhtPartitionsExchangeFuture futureToFetchAff = null;
GridDhtPartitionsExchangeFuture futToFetchAff = null;

for (int i = idx + 1; i < exchFuts.size(); i++) {
GridDhtPartitionsExchangeFuture prev = exchFuts.get(i);
Expand All @@ -1998,40 +1998,40 @@ public IgniteInternalFuture<?> initCoordinatorCaches(
if (prev.isMerged())
continue;

futureToFetchAff = prev;
futToFetchAff = prev;

break;
}

if (futureToFetchAff == null)
if (futToFetchAff == null)
throw new IgniteCheckedException("Failed to find completed exchange future to fetch affinity.");

if (log.isDebugEnabled()) {
log.debug("Need initialize affinity on coordinator [" +
"cacheGrp=" + desc.cacheOrGroupName() +
"prevAff=" + futureToFetchAff.topologyVersion() + ']');
"prevAff=" + futToFetchAff.topologyVersion() + ']');
}

GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(
cctx,
desc.groupId(),
futureToFetchAff.topologyVersion(),
futureToFetchAff.events().discoveryCache()
futToFetchAff.topologyVersion(),
futToFetchAff.events().discoveryCache()
);

fetchFut.init(false);

final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();

final GridDhtPartitionsExchangeFuture futureToFetchAffinity0 = futureToFetchAff;
final GridDhtPartitionsExchangeFuture futToFetchAffinity0 = futToFetchAff;

fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
@Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
throws IgniteCheckedException {
fetchAffinity(
futureToFetchAffinity0.topologyVersion(),
futureToFetchAffinity0.events(),
futureToFetchAffinity0.events().discoveryCache(),
futToFetchAffinity0.topologyVersion(),
futToFetchAffinity0.events(),
futToFetchAffinity0.events().discoveryCache(),
aff,
(GridDhtAssignmentFetchFuture)fetchFut
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,17 +1380,17 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
if (!onEnterIfNoStop(gate))
return;

IgniteFuture<?> destroyFuture;
IgniteFuture<?> destroyFut;

try {
destroyFuture = delegate.destroyAsync();
destroyFut = delegate.destroyAsync();
}
finally {
onLeave(gate);
}

if (destroyFuture != null)
destroyFuture.get();
if (destroyFut != null)
destroyFut.get();
}

/** {@inheritDoc} */
Expand All @@ -1405,17 +1405,17 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) {
if (!onEnterIfNoStop(gate))
return;

IgniteFuture<?> closeFuture;
IgniteFuture<?> closeFut;

try {
closeFuture = closeAsync();
closeFut = closeAsync();
}
finally {
onLeave(gate);
}

if (closeFuture != null)
closeFuture.get();
if (closeFut != null)
closeFut.get();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4796,10 +4796,10 @@ else if (res.resultType() == ResultType.FILTERED) {
else if (res.resultType() == ResultType.LOCKED) {
entry.unlockEntry();

IgniteInternalFuture<?> lockFuture =
IgniteInternalFuture<?> lockFut0 =
cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion());

lockFuture.listen(this);
lockFut0.listen(this);

return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ public IgniteInternalFuture<?> finishRemoteTxs(AffinityTopologyVersion topVer) {

for (GridCacheFuture<?> fut : futs.values()) {
if (fut instanceof GridDhtTxFinishFuture) {
GridDhtTxFinishFuture finishTxFuture = (GridDhtTxFinishFuture)fut;
GridDhtTxFinishFuture finishTxFut = (GridDhtTxFinishFuture)fut;

if (cctx.tm().needWaitTransaction(finishTxFuture.tx(), topVer))
res.add(ignoreErrors(finishTxFuture));
if (cctx.tm().needWaitTransaction(finishTxFut.tx(), topVer))
res.add(ignoreErrors(finishTxFut));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,10 +966,10 @@ public IgniteInternalFuture<?> partitionReleaseFuture(AffinityTopologyVersion to
f.add(mvcc().finishAtomicUpdates(topVer));
f.add(mvcc().finishDataStreamerUpdates(topVer));

IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer);
IgniteInternalFuture<?> finishLocalTxsFut = tm().finishLocalTxs(topVer);
// To properly track progress of finishing local tx updates we explicitly add this future to compound set.
f.add(finishLocalTxsFuture);
f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer));
f.add(finishLocalTxsFut);
f.add(tm().finishAllTxs(finishLocalTxsFut, topVer));

f.markInitialized();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3785,9 +3785,9 @@ public synchronized void acceptFile(File part) {
if (o == null || getClass() != o.getClass())
return false;

RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
RemoteSnapshotFilesRecevier fut = (RemoteSnapshotFilesRecevier)o;

return Objects.equals(reqId, future.reqId);
return Objects.equals(reqId, fut.reqId);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2037,9 +2037,9 @@ private PartitionRestoreFuture(int partId, AtomicInteger cntr) {
if (o == null || getClass() != o.getClass())
return false;

PartitionRestoreFuture future = (PartitionRestoreFuture)o;
PartitionRestoreFuture fut = (PartitionRestoreFuture)o;

return partId == future.partId;
return partId == fut.partId;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,29 +674,29 @@ private void initializeSemaphore() throws IgniteCheckedException {
int numPermits) {
acquire(numPermits);

Future<T> passedInCallableFuture = ctx.kernalContext().pools().getExecutorService().submit(callable);
Future<T> passedInCallableFut = ctx.kernalContext().pools().getExecutorService().submit(callable);

final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() {
@Override public T get() {
try {
return passedInCallableFuture.get();
return passedInCallableFut.get();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
};

IgniteFuture<T> future = new IgniteFutureImpl<>(fut);
IgniteFuture<T> fut0 = new IgniteFutureImpl<>(fut);

future.listen(new IgniteInClosure<IgniteFuture<T>>() {
fut0.listen(new IgniteInClosure<IgniteFuture<T>>() {
/** {@inheritDoc} */
@Override public void apply(IgniteFuture<T> igniteFuture) {
release(numPermits);
}
});

return future;
return fut0;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ private PlatformCompute(

PlatformCallable callable = new PlatformCallable(func, ptr, funcName);

IgniteInternalFuture future = compute.affinityCallAsync(cacheNames, part, callable);
IgniteInternalFuture fut = compute.affinityCallAsync(cacheNames, part, callable);

return wrapListenable(readAndListenFuture(reader, future));
return wrapListenable(readAndListenFuture(reader, fut));
}

case OP_AFFINITY_CALL: {
Expand All @@ -207,9 +207,9 @@ private PlatformCompute(

PlatformCallable callable = new PlatformCallable(func, ptr, callableName);

IgniteInternalFuture future = compute.affinityCallAsync(Collections.singletonList(cacheName), key, callable);
IgniteInternalFuture fut = compute.affinityCallAsync(Collections.singletonList(cacheName), key, callable);

return wrapListenable(readAndListenFuture(reader, future));
return wrapListenable(readAndListenFuture(reader, fut));
}

case OP_AFFINITY_RUN_PARTITION: {
Expand All @@ -221,9 +221,9 @@ private PlatformCompute(

PlatformRunnable runnable = new PlatformRunnable(func, ptr, runnableName);

IgniteInternalFuture future = compute.affinityRunAsync(cacheNames, part, runnable);
IgniteInternalFuture fut = compute.affinityRunAsync(cacheNames, part, runnable);

return wrapListenable(readAndListenFuture(reader, future));
return wrapListenable(readAndListenFuture(reader, fut));
}

case OP_AFFINITY_RUN: {
Expand All @@ -235,9 +235,9 @@ private PlatformCompute(

PlatformRunnable runnable = new PlatformRunnable(func, ptr, runnableName);

IgniteInternalFuture future = compute.affinityRunAsync(Collections.singleton(cacheName), key, runnable);
IgniteInternalFuture fut = compute.affinityRunAsync(Collections.singleton(cacheName), key, runnable);

return wrapListenable(readAndListenFuture(reader, future));
return wrapListenable(readAndListenFuture(reader, fut));
}

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,18 +373,18 @@ public void start() {
*/
public void updateAllLocalStatistics() {
try {
GridCompoundFuture<Boolean, Boolean> compoundFuture = new GridCompoundFuture<>(CU.boolReducer());
GridCompoundFuture<Boolean, Boolean> compoundFut = new GridCompoundFuture<>(CU.boolReducer());

distrMetaStorage.iterate(STAT_OBJ_PREFIX, (k, v) -> {
StatisticsObjectConfiguration cfg = (StatisticsObjectConfiguration)v;

compoundFuture.add(updateLocalStatisticsAsync(cfg));
compoundFut.add(updateLocalStatisticsAsync(cfg));
});

compoundFuture.markInitialized();
compoundFut.markInitialized();

compoundFuture.listen(() -> {
if (compoundFuture.error() == null && !compoundFuture.result())
compoundFut.listen(() -> {
if (compoundFut.error() == null && !compoundFut.result())
mgmtBusyExecutor.execute(this::updateAllLocalStatistics);
});
}
Expand Down Expand Up @@ -486,26 +486,26 @@ public IgniteInternalFuture<Boolean> dropStatisticsAsync(List<StatisticsTarget>
if (log.isDebugEnabled())
log.debug("Drop statistics [targets=" + targets + ']');

GridFutureAdapter<Boolean> resultFuture = new GridFutureAdapter<>();
IgniteInternalFuture<Boolean> chainFuture = new GridFinishedFuture<>(true);
GridFutureAdapter<Boolean> resultFut = new GridFutureAdapter<>();
IgniteInternalFuture<Boolean> chainFut = new GridFinishedFuture<>(true);

for (StatisticsTarget target : targets) {
chainFuture = chainFuture.chainCompose(f -> {
chainFut = chainFut.chainCompose(f -> {
if (f.error() == null && f.result() == Boolean.TRUE)
return removeFromMetastore(target, validate);

return f;
});
}

chainFuture.listen(f -> {
chainFut.listen(f -> {
if (f.error() != null)
resultFuture.onDone(f.error());
resultFut.onDone(f.error());
else
resultFuture.onDone(f.result() == null || f.result().booleanValue());
resultFut.onDone(f.result() == null || f.result().booleanValue());
});

return resultFuture;
return resultFut;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ public Job(ComputeTask<T, R> remoteTask, @Nullable T arg) {

compute.execute(remoteTask, arg);

ComputeTaskFuture<R> future = compute.future();
ComputeTaskFuture<R> fut = compute.future();

this.future = future;
this.future = fut;

jobCtx.holdcc();

future.listen(new IgniteInClosure<IgniteFuture<R>>() {
fut.listen(new IgniteInClosure<IgniteFuture<R>>() {
@Override public void apply(IgniteFuture<R> future) {
jobCtx.callcc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testJoinedNodeCanStealJobs() throws Exception {

jobExecutedLatch = new CountDownLatch(threadsNum);

final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Runnable() {
final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
/**
*
*/
Expand Down Expand Up @@ -220,7 +220,7 @@ public void testJoinedNodeCanStealJobs() throws Exception {
info("Metrics [nodeId=" + g.cluster().localNode().id() +
", metrics=" + g.cluster().localNode().metrics() + ']');

future.get();
fut.get();

assertNull("Test failed with exception: ", fail.get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ public void testClusterGroupLocalIdAfterClientReconnect() throws Exception {
startGrid(0);

// wait for client reconnect
IgniteFuture future = client.cluster().clientReconnectFuture();
IgniteFuture fut = client.cluster().clientReconnectFuture();

assertNotNull(future);
assertNotNull(fut);

future.get(20_000); // throws if times out
fut.get(20_000); // throws if times out

ClusterGroup cg2 = client.cluster().forLocal();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ public void testDeactivateDuringEvictionAndRebalance() throws Exception {

Set<Integer> addedKeys = new GridConcurrentHashSet<>();

IgniteInternalFuture cacheLoadFuture = GridTestUtils.runMultiThreadedAsync(
IgniteInternalFuture cacheLoadFut = GridTestUtils.runMultiThreadedAsync(
() -> {
while (!stop.get()) {
int key = keyCounter.incrementAndGet();
Expand Down Expand Up @@ -718,7 +718,7 @@ public void testDeactivateDuringEvictionAndRebalance() throws Exception {

stop.set(true);

cacheLoadFuture.get();
cacheLoadFut.get();

// Deactivate and activate again.
srv.cluster().state(INACTIVE);
Expand Down
Loading

0 comments on commit 2689972

Please sign in to comment.