Skip to content

Commit

Permalink
IGNITE-21399 Fixed logging of reduced cache entries for distributed l…
Browse files Browse the repository at this point in the history
…ong running operations. (apache#11210)

(cherry picked from commit 597302d)
  • Loading branch information
petrov-mg authored and Петр Иванов committed Feb 13, 2024
1 parent 7d0f5e5 commit 7171703
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteClosure;
Expand Down Expand Up @@ -643,7 +644,7 @@ public static <K, V> IgniteReducer<Map<K, V>, Map<K, V>> mapsReducer(final int s

/** {@inheritDoc} */
@Override public String toString() {
return "Map reducer: " + ret;
return S.toString("Map Reducer", "reducedEntries", ret);
}
};
}
Expand Down Expand Up @@ -676,7 +677,7 @@ public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer

/** {@inheritDoc} */
@Override public synchronized String toString() {
return "Collection reducer: " + ret;
return S.toString("Collection Reducer", "reducedElements", ret);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TO_STRING_COLLECTION_LIMIT;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
import static org.apache.ignite.internal.util.tostring.GridToStringBuilder.DFLT_TO_STRING_COLLECTION_LIMIT;

/** */
public class CacheDistributedGetLongRunningFutureDumpTest extends GridCommonAbstractTest {
/** */
private final ListeningTestLogger listeningLog = new ListeningTestLogger(log);

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setGridLogger(listeningLog)
.setCommunicationSpi(new TestRecordingCommunicationSpi());
}

/** */
@Test
@WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = "5000")
public void test() throws Exception {
IgniteEx ignite = startGrids(3);

IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>()
.setName(DEFAULT_CACHE_NAME)
.setAtomicityMode(TRANSACTIONAL));

int logColElementsLimit = getInteger(IGNITE_TO_STRING_COLLECTION_LIMIT, DFLT_TO_STRING_COLLECTION_LIMIT);

Set<Key> keys = new TreeSet<>();

for (int i = 0; i < logColElementsLimit * 10; i++) {
Key key = new Key(i);

keys.add(key);

cache.put(key, i);
}

spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridNearGetResponse);

CompletableFuture<String> longRunningOpsLoggedFut = new CompletableFuture<>();

listeningLog.registerListener(str -> {
if (str.startsWith(">>> Future [")) {
longRunningOpsLoggedFut.complete(str);
}
});

IgniteFuture<Map<Object, Object>> getAllFut = cache.getAllAsync(keys);

String longRunningOps = longRunningOpsLoggedFut.get(getTestTimeout(), MILLISECONDS);

Matcher reducedEntriesMatcher = Pattern.compile("rdc=Map Reducer \\[reducedEntries=ConcurrentHashMap \\{(.+)\\}")
.matcher(longRunningOps);

assertTrue(reducedEntriesMatcher.find());

assertEquals(logColElementsLimit, reducedEntriesMatcher.group(1).split(", ").length);

spi(grid(1)).stopBlock();

getAllFut.get(getTestTimeout());
}

/** */
public static class Key implements Comparable<Key> {
/** */
private final int val;

/** */
public Key(int val) {
this.val = val;
}

/** {@inheritDoc} */
@Override public String toString() {
return "Key-Data-" + val;
}

/** {@inheritDoc} */
@Override public int compareTo(@NotNull Key o) {
return Integer.compare(val, o.val);
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;

if (o == null || getClass() != o.getClass())
return false;

return val == ((Key)o).val;
}

/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(val);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.ignite.internal.metric.SystemViewComputeJobTest;
import org.apache.ignite.internal.metric.SystemViewSelfTest;
import org.apache.ignite.internal.processors.cache.CacheClearAsyncDeadlockTest;
import org.apache.ignite.internal.processors.cache.CacheDistributedGetLongRunningFutureDumpTest;
import org.apache.ignite.internal.processors.cache.GridCacheDataTypesCoverageTest;
import org.apache.ignite.internal.processors.cache.GridCacheLongRunningTransactionDiagnosticsTest;
import org.apache.ignite.internal.processors.cache.GridCacheVersionGenerationWithCacheStorageTest;
Expand Down Expand Up @@ -100,6 +101,7 @@ public static List<Class<?>> suite(Collection<Class> ignoredTests) {
GridTestUtils.addTestIfNeeded(suite, CacheClearAsyncDeadlockTest.class, ignoredTests);

GridTestUtils.addTestIfNeeded(suite, TxAsyncOpsSemaphorePermitsExceededTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheDistributedGetLongRunningFutureDumpTest.class, ignoredTests);

return suite;
}
Expand Down

0 comments on commit 7171703

Please sign in to comment.