diff --git a/balancing/src/main/java/ru/hh/jclient/common/balancing/BalancingUpstreamManager.java b/balancing/src/main/java/ru/hh/jclient/common/balancing/BalancingUpstreamManager.java index 31eae001..5e9a4e71 100644 --- a/balancing/src/main/java/ru/hh/jclient/common/balancing/BalancingUpstreamManager.java +++ b/balancing/src/main/java/ru/hh/jclient/common/balancing/BalancingUpstreamManager.java @@ -1,7 +1,6 @@ package ru.hh.jclient.common.balancing; import java.util.Collection; -import java.util.List; import java.util.Map; import static java.util.Objects.requireNonNull; import java.util.Set; @@ -45,7 +44,7 @@ public void updateUpstreams(Collection upstreams) { } private void updateUpstream(@Nonnull String upstreamName) { - List servers = serverStore.getServers(upstreamName); + Set servers = serverStore.getServers(upstreamName); if (servers.isEmpty() && serverStore.getInitialSize(upstreamName).filter(val -> val > 0).isPresent()) { monitoring.forEach(m -> m.countUpdateIgnore(upstreamName, datacenter)); @@ -62,14 +61,16 @@ private void updateUpstream(@Nonnull String upstreamName) { if (upstream == null) { upstream = createUpstream(upstreamName, newConfig, servers); } else { - upstream.update(newConfig, servers); + // TODO REPLACE servers.stream().toList() WITH SET + upstream.update(newConfig, servers.stream().toList()); } return upstream; }); } - private Upstream createUpstream(String upstreamName, UpstreamConfigs upstreamConfigs, List servers) { - return new Upstream(upstreamName, upstreamConfigs, servers, datacenter, allowCrossDCRequests); + private Upstream createUpstream(String upstreamName, UpstreamConfigs upstreamConfigs, Set servers) { + // TODO REPLACE servers.stream().toList() WITH SET + return new Upstream(upstreamName, upstreamConfigs, servers.stream().toList(), datacenter, allowCrossDCRequests); } @Override diff --git a/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStore.java b/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStore.java index 4bbb6d6b..43333297 100644 --- a/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStore.java +++ b/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStore.java @@ -1,11 +1,11 @@ package ru.hh.jclient.common.balancing; import java.util.Collection; -import java.util.List; import java.util.Optional; +import java.util.Set; public interface ServerStore { - List getServers(String serviceName); + Set getServers(String upstreamName); Optional getInitialSize(String serviceName); void updateServers(String serviceName, Collection aliveServers, Collection deadServers); } diff --git a/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStoreImpl.java b/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStoreImpl.java index 5e9414f6..b34a8ffc 100644 --- a/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStoreImpl.java +++ b/balancing/src/main/java/ru/hh/jclient/common/balancing/ServerStoreImpl.java @@ -1,8 +1,8 @@ package ru.hh.jclient.common.balancing; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -14,11 +14,11 @@ public class ServerStoreImpl implements ServerStore { private final Map initialCapacities = new HashMap<>(); @Override - public List getServers(String serviceName) { + public Set getServers(String upstreamName) { return Optional - .ofNullable(serverList.get(serviceName)) - .map(List::copyOf) - .orElseGet(List::of); + .ofNullable(serverList.get(upstreamName)) + .map(Set::copyOf) + .orElseGet(Collections::emptySet); } @Override diff --git a/balancing/src/main/java/ru/hh/jclient/consul/UpstreamServiceImpl.java b/balancing/src/main/java/ru/hh/jclient/consul/UpstreamServiceImpl.java index 7cbfaba8..9d8a58a3 100644 --- a/balancing/src/main/java/ru/hh/jclient/consul/UpstreamServiceImpl.java +++ b/balancing/src/main/java/ru/hh/jclient/consul/UpstreamServiceImpl.java @@ -1,7 +1,9 @@ package ru.hh.jclient.consul; +import io.netty.util.NetUtil; import java.math.BigInteger; import java.util.Collection; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -231,12 +233,13 @@ private void checkServersForAllUpstreamsExist(boolean throwIfError, List } } - void updateUpstreams(Map upstreams, String serviceName, String datacenter) { - Set currentServers = serverStore - .getServers(serviceName) + void updateUpstreams(Map upstreams, String upstreamName, String datacenter) { + // ServerStore is backed by a SET which MUST have ordering guarantees + LinkedHashSet currentServers = serverStore + .getServers(upstreamName) .stream() .filter(server -> datacenter.equals(server.getDatacenter())) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(LinkedHashSet::new)); Map serverToRemoveByAddress = currentServers.stream().collect(toMap(Server::getAddress, Function.identity())); @@ -249,7 +252,14 @@ void updateUpstreams(Map upstreams, String serv Service service = serviceHealth.getService(); - String address = Server.addressFromHostPort(getAddress(serviceHealth), service.getPort()); + // A known constraint. We do not allow upstream names because it floods DNS server with resolve requests. + String ipAddress = serviceHealth.getService().getAddress(); + if (!isValidIpAddress(ipAddress)) { + LOGGER.warn("Invalid ip address supplied {}", ipAddress); + continue; + } + + String address = Server.addressFromHostPort(ipAddress, service.getPort()); String nodeDatacenter = serviceHealth.getNode().getDatacenter().map(this::restoreOriginalDataCenterName).orElse(null); int serverWeight = service.getWeights().orElse(defaultWeight).getPassing(); @@ -261,16 +271,20 @@ void updateUpstreams(Map upstreams, String serv } server.update(serverWeight, service.getMeta(), service.getTags()); } - serverStore.updateServers(serviceName, currentServers, serverToRemoveByAddress.values()); + serverStore.updateServers(upstreamName, currentServers, serverToRemoveByAddress.values()); LOGGER.info( "upstreams for {} were updated in DC {}; alive servers: {}, dead servers: {}", - serviceName, + upstreamName, datacenter, LOGGER.isDebugEnabled() ? currentServers : currentServers.size(), LOGGER.isDebugEnabled() ? serverToRemoveByAddress.values() : serverToRemoveByAddress.values().size() ); } + private static boolean isValidIpAddress(String address) { + return NetUtil.isValidIpV4Address(address) || NetUtil.isValidIpV6Address(address); + } + private boolean notSameNode(String nodeName) { return !StringUtils.isBlank(currentNode) && !currentNode.equalsIgnoreCase(nodeName); } @@ -284,15 +298,6 @@ private String restoreOriginalDataCenterName(String lowerCasedDcName) { return restoredDc; } - private static String getAddress(ServiceHealth serviceHealth) { - String address = serviceHealth.getService().getAddress(); - if (!StringUtils.isBlank(address)) { - return address; - } - - return serviceHealth.getNode().getAddress(); - } - ServerStore getUpstreamStore() { return serverStore; } diff --git a/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTest.java b/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTest.java index a656a176..93ccb4f9 100644 --- a/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTest.java +++ b/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTest.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import static java.util.Collections.singletonList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,9 +43,9 @@ public class BalancingClientTest extends BalancingClientTestBase { @Test public void testBalancing() throws Exception { Server server1 = new Server("server1", null, 10, null); - Server server2 = new Server("server1", null, 5, null); - Server server3 = new Server("server1", null, 1, null); - List servers = List.of( + Server server2 = new Server("server2", null, 5, null); + Server server3 = new Server("server3", null, 1, null); + Set servers = Set.of( server1, server2, server3 @@ -69,14 +70,14 @@ public void testBalancing() throws Exception { getTestClient().get(); } - assertEquals(0, servers.get(0).getCurrentRequests()); - assertEquals(6, servers.get(0).getStatsRequests()); + assertEquals(0, server1.getCurrentRequests()); + assertEquals(6, server1.getStatsRequests()); - assertEquals(0, servers.get(1).getCurrentRequests()); - assertEquals(3, servers.get(1).getStatsRequests()); + assertEquals(0, server2.getCurrentRequests()); + assertEquals(3, server2.getStatsRequests()); - assertEquals(0, servers.get(2).getCurrentRequests()); - assertEquals(1, servers.get(2).getStatsRequests()); + assertEquals(0, server3.getCurrentRequests()); + assertEquals(1, server3.getStatsRequests()); } @Test @@ -85,7 +86,7 @@ public void testBalancingCrossDc() throws Exception { Server server1 = new Server("server1", null, 11, currentDC); Server server2 = new Server("server2", null, 5, "DC2"); Server server3 = new Server("server3", null, 1, null); - List servers = List.of( + Set servers = Set.of( server1, server2, server3 @@ -106,11 +107,11 @@ public void testBalancingCrossDc() throws Exception { for (int i = 0; i < servers.stream().mapToInt(Server::getWeight).max().getAsInt() - 1; i++) { getTestClient().get(); } - assertEquals(10, servers.get(0).getStatsRequests()); - assertEquals(0, servers.get(1).getStatsRequests()); - assertEquals(0, servers.get(2).getStatsRequests()); + assertEquals(10, server1.getStatsRequests()); + assertEquals(0, server2.getStatsRequests()); + assertEquals(0, server3.getStatsRequests()); - List noCurrentDcServers = List.of(server2, server3); + Set noCurrentDcServers = Set.of(server2, server3); when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(noCurrentDcServers); upstreamManager.updateUpstreams(Set.of(TEST_UPSTREAM)); @@ -118,19 +119,18 @@ public void testBalancingCrossDc() throws Exception { getTestClient().get(); } - assertEquals(3, noCurrentDcServers.get(0).getStatsRequests()); - assertEquals(1, noCurrentDcServers.get(1).getStatsRequests()); + assertEquals(3, server2.getStatsRequests()); + assertEquals(1, server3.getStatsRequests()); } @Test public void testAddServer() throws Exception { + Set servers = new LinkedHashSet<>(); Server existingServer = new Server("server1", null, 3, null); - List servers = new ArrayList<>(); servers.add(existingServer); when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); - when(configStore.getUpstreamConfig(TEST_UPSTREAM)).thenReturn(ApplicationConfig.toUpstreamConfigs(applicationConfig, DEFAULT)); createHttpClientFactory(List.of(TEST_UPSTREAM)); @@ -144,21 +144,26 @@ public void testAddServer() throws Exception { getTestClient().get(); Server newServer = new Server("server2", null, 3, null); servers.add(newServer); + getTestClient().get(); - assertEquals(2, servers.get(0).getStatsRequests()); + assertEquals(2, existingServer.getStatsRequests()); assertNotEquals(newServer.getAddress(), calledAddresses.get(calledAddresses.size() - 1)); - assertEquals(1, servers.get(1).getStatsRequests()); + assertEquals(1, newServer.getStatsRequests()); + getTestClient().get(); assertEquals(newServer.getAddress(), calledAddresses.get(calledAddresses.size() - 1)); - assertEquals(2, servers.get(1).getStatsRequests()); - assertEquals(2, servers.get(0).getStatsRequests()); + assertEquals(2, newServer.getStatsRequests()); + assertEquals(2, existingServer.getStatsRequests()); } @Test public void testNoSlowStart() throws Exception { Server server1 = new Server("server1", null, 3, null); Server server2 = new Server("server2", null, 3, null); - List servers = List.of(server1, server2); + Set servers = new LinkedHashSet<>(){{ + add(server1); + add(server2); + }}; when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); @@ -194,7 +199,9 @@ protected long getCurrentTimeMillis(Clock clock) { return currentTimeMillis.get(); } }; - var servers = List.of(server1, server2); + Set servers = new LinkedHashSet<>(); + servers.add(server1); + servers.add(server2); when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); int slowStartInterval = 2; @@ -228,9 +235,9 @@ protected long getCurrentTimeMillis(Clock clock) { @Test public void testBalancingWithCrossDC() throws Exception { Server server1 = new Server("server1", null, 10, "anotherDC"); - Server server2 = new Server("server1", null, 5, null); - Server server3 = new Server("server1", null, 1, null); - List servers = List.of( + Server server2 = new Server("server2", null, 5, null); + Server server3 = new Server("server3", null, 1, null); + Set servers = Set.of( server1, server2, server3 @@ -255,14 +262,14 @@ public void testBalancingWithCrossDC() throws Exception { getTestClient().get(); } - assertEquals(0, servers.get(0).getCurrentRequests()); - assertEquals(0, servers.get(0).getStatsRequests()); + assertEquals(0, server1.getCurrentRequests()); + assertEquals(0, server1.getStatsRequests()); - assertEquals(0, servers.get(1).getCurrentRequests()); - assertEquals(4, servers.get(1).getStatsRequests()); + assertEquals(0, server2.getCurrentRequests()); + assertEquals(4, server2.getStatsRequests()); - assertEquals(0, servers.get(2).getCurrentRequests()); - assertEquals(1, servers.get(2).getStatsRequests()); + assertEquals(0, server3.getCurrentRequests()); + assertEquals(1, server3.getStatsRequests()); } @Test @@ -279,7 +286,7 @@ profileBar, new Profile().setRequestTimeoutSec(11f) when(configStore.getUpstreamConfig(TEST_UPSTREAM)).thenReturn(ApplicationConfig.toUpstreamConfigs(applicationConfig, DEFAULT)); - when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(List.of(new Server("server1", null, 1, null))); + when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(Set.of(new Server("server1", null, 1, null))); List upstreamList = List.of(TEST_UPSTREAM); createHttpClientFactory(upstreamList); @@ -374,7 +381,7 @@ public void requestWithUnknownUpstream() throws Exception { @Test public void requestAndUpdateServers() throws Exception { when(serverStore.getServers(TEST_UPSTREAM)) - .thenReturn(List.of(new Server("server1", null, 1, null))); + .thenReturn(Set.of(new Server("server1", null, 1, null))); createHttpClientFactory(); Request[] request = new Request[1]; @@ -387,14 +394,16 @@ public void requestAndUpdateServers() throws Exception { getTestClient().get(); assertHostEquals(request[0], "server1"); when(serverStore.getServers(TEST_UPSTREAM)) - .thenReturn(List.of(new Server("server2", null, 1, null))); + .thenReturn(Set.of(new Server("server2", null, 1, null))); upstreamManager.updateUpstreams(Set.of(TEST_UPSTREAM)); getTestClient().get(); assertHostEquals(request[0], "server2"); - when(serverStore.getServers(TEST_UPSTREAM)) - .thenReturn(List.of(new Server("server2", null, 1, null), new Server("server3", null, 1, null))); + Set servers = new LinkedHashSet<>(); + servers.add(new Server("server2", null, 1, null)); + servers.add(new Server("server3", null, 1, null)); + when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); upstreamManager.updateUpstreams(Set.of(TEST_UPSTREAM)); getTestClient().get(); @@ -419,14 +428,14 @@ public void shouldNotRetryRequestTimeoutForPost() { try { getTestClient().post(); } catch (Exception e) { - assertRequestEquals(request, "server1"); + assertRequestEquals(request, Set.of("server1")); debug.assertCalled(REQUEST, RESPONSE, FINISHED); } } @Test public void disallowCrossDCRequests() throws Exception { - List servers = List.of(new Server("server1", null, 1, "DC1"), new Server("server2", null, 1, "DC2")); + Set servers = Set.of(new Server("server1", null, 1, "DC1"), new Server("server2", null, 1, "DC2")); when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); createHttpClientFactory(List.of(TEST_UPSTREAM), "DC1", false); @@ -452,7 +461,7 @@ public void disallowCrossDCRequests() throws Exception { public void balancedRequestMonitoring() throws Exception { String datacenter = "DC1"; createHttpClientFactory(List.of(TEST_UPSTREAM), datacenter, false); - when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(List.of(new Server("server1", null, 1, datacenter))); + when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(Set.of(new Server("server1", null, 1, datacenter))); upstreamManager.updateUpstreams(Set.of(TEST_UPSTREAM)); when(httpClient.executeRequest(isA(Request.class), isA(CompletionHandler.class))) @@ -501,14 +510,14 @@ public void unbalancedRequestMonitoring() throws Exception { public void failIfNoBackendAvailableInCurrentDC() throws Exception { createHttpClientFactory(List.of(TEST_UPSTREAM), "DC1", false); when(serverStore.getServers(TEST_UPSTREAM)) - .thenReturn(List.of(new Server("server1", null, 1, "DC2"))); + .thenReturn(Set.of(new Server("server1", null, 1, "DC2"))); getTestClient().get(); } @Test public void testAllowCrossDCRequests() throws Exception { - List servers = List.of(new Server("server1", null, 1, "DC1"), new Server("server2", null, 1, "DC2")); + Set servers = Set.of(new Server("server1", null, 1, "DC1"), new Server("server2", null, 1, "DC2")); when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); createHttpClientFactory(List.of(TEST_UPSTREAM), "DC1", true); @@ -526,7 +535,7 @@ public void testAllowCrossDCRequests() throws Exception { getTestClient().get(); assertHostEquals(request[0], "server1"); - when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(List.of(new Server("server2", null, 1, "DC2"))); + when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(Set.of(new Server("server2", null, 1, "DC2"))); upstreamManager.updateUpstreams(Set.of(TEST_UPSTREAM)); getTestClient().get(); @@ -577,7 +586,7 @@ protected long getCurrentTimeMillis(Clock clock) { } }; - var servers = List.of(server1); + var servers = Set.of(server1); when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); @@ -609,7 +618,7 @@ protected long getCurrentTimeMillis(Clock clock) { } }; - var servers = List.of(server1); + var servers = Set.of(server1); when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); diff --git a/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTestBase.java b/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTestBase.java index 74298650..75ca849e 100644 --- a/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTestBase.java +++ b/client-tests/src/test/java/ru/hh/jclient/common/BalancingClientTestBase.java @@ -5,6 +5,7 @@ import java.net.ConnectException; import java.util.Arrays; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -65,13 +66,17 @@ public void setUpTest() { httpClient = mock(AsyncHttpClient.class); when(httpClient.getConfig()).thenReturn(httpClientConfig); when(configStore.getUpstreamConfig(TEST_UPSTREAM)).thenReturn(ApplicationConfig.toUpstreamConfigs(new ApplicationConfig(), DEFAULT)); - when(serverStore.getServers(TEST_UPSTREAM)) - .thenReturn(List.of(new Server("server1", null, 1, null), new Server("server2", null, 2, null))); + + LinkedHashSet servers = new LinkedHashSet<>() {{ + add(new Server("server1", null, 1, null)); + add(new Server("server2", null, 2, null)); + }}; + when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); } @Test public void shouldMakeGetRequestForSingleServer() throws Exception { - when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(List.of(new Server("server1", null, 1, null))); + when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(Set.of(new Server("server1", null, 1, null))); createHttpClientFactory(List.of(TEST_UPSTREAM)); @@ -113,7 +118,11 @@ public void retryIOExceptionRemotelyClosed() throws Exception { Request[] request = mockRetryIOException("Remotely closed"); getTestClient().get(); - assertRequestEquals(request, "server1", "server2"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @@ -125,7 +134,11 @@ public void retryIdempotentIOExceptionResetByPeer() throws Exception { Request[] request = mockRetryIOException("Connection reset by peer"); getTestClient().get(); - assertRequestEquals(request, "server1", "server2"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @@ -166,12 +179,12 @@ private Request[] mockRetryIOException(String exceptionText) { @Test public void retryConnectException() throws Exception { - List servers = List.of( - new Server("server1", null, 1, null), - new Server("server2", null, 1, null), - new Server("server3", null, 1, null), - new Server("server4", null, 1, null) - ); + Set servers = new LinkedHashSet<>() {{ + add(new Server("server1", null, 1, null)); + add(new Server("server2", null, 1, null)); + add(new Server("server3", null, 1, null)); + add(new Server("server4", null, 1, null)); + }}; when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); @@ -202,18 +215,24 @@ public void retryConnectException() throws Exception { getTestClient().get(); - assertRequestEquals(request, "server1", "server2", "server3", "server4"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + add("server3"); + add("server4"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RETRY, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @Test public void retry503() throws Exception { - List servers = List.of( - new Server("server1", null, 1, null), - new Server("server2", null, 1, null), - new Server("server3", null, 1, null) - ); + Set servers = new LinkedHashSet<>() {{ + add(new Server("server1", null, 1, null)); + add(new Server("server2", null, 1, null)); + add(new Server("server3", null, 1, null)); + }}; when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); @@ -227,7 +246,12 @@ public void retry503() throws Exception { getTestClient().get(); - assertRequestEquals(request, "server1", "server2", "server3"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + add("server3"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @@ -249,7 +273,11 @@ public void retryTimeoutException() throws Exception { getTestClient().get(); - assertRequestEquals(request, "server1", "server2"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @@ -298,11 +326,11 @@ public void testRequestTimeoutForExternalUrl() throws Exception { @Test public void retry503ForNonIdempotentRequest() throws Exception { - List servers = List.of( - new Server("server1", null, 1, null), - new Server("server2", null, 1, null), - new Server("server3", null, 1, null) - ); + Set servers = new LinkedHashSet<>() {{ + add(new Server("server1", null, 1, null)); + add(new Server("server2", null, 1, null)); + add(new Server("server3", null, 1, null)); + }}; when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); applicationConfig @@ -320,18 +348,23 @@ public void retry503ForNonIdempotentRequest() throws Exception { getTestClient().post(); - assertRequestEquals(request, "server1", "server2", "server3"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + add("server3"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @Test public void retryConnectTimeoutException() throws Exception { - List servers = List.of( - new Server("server1", null, 1, null), - new Server("server2", null, 1, null), - new Server("server3", null, 1, null) - ); + Set servers = new LinkedHashSet<>() {{ + add(new Server("server1", null, 1, null)); + add(new Server("server2", null, 1, null)); + add(new Server("server3", null, 1, null)); + }}; when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); @@ -345,18 +378,23 @@ public void retryConnectTimeoutException() throws Exception { getTestClient().get(); - assertRequestEquals(request, "server1", "server2", "server3"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + add("server3"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @Test public void retryConnectTimeoutExceptionForNonIdempotentRequest() throws Exception { - List servers = List.of( - new Server("server1", null, 1, null), - new Server("server2", null, 1, null), - new Server("server3", null, 1, null) - ); + Set servers = new LinkedHashSet<>() {{ + add(new Server("server1", null, 1, null)); + add(new Server("server2", null, 1, null)); + add(new Server("server3", null, 1, null)); + }}; when(serverStore.getServers(TEST_UPSTREAM)).thenReturn(servers); ApplicationConfig applicationConfig = buildTestConfig(); @@ -369,7 +407,12 @@ public void retryConnectTimeoutExceptionForNonIdempotentRequest() throws Excepti getTestClient().post(); - assertRequestEquals(request, "server1", "server2", "server3"); + LinkedHashSet hosts = new LinkedHashSet<>() {{ + add("server1"); + add("server2"); + add("server3"); + }}; + assertRequestEquals(request, hosts); debug.assertCalled(REQUEST, RESPONSE, RETRY, RESPONSE, RETRY, RESPONSE, RESPONSE_CONVERTED, FINISHED); } @@ -450,14 +493,15 @@ Request failWith(Throwable t, InvocationOnMock iom) { protected abstract boolean isAdaptive(); - void assertRequestEquals(Request[] request, String... actual) { + void assertRequestEquals(Request[] request, Set actual) { if (isAdaptive()) { - assertTrue(toSet(request).containsAll(toSet(actual))); - assertTrue(toSet(actual).containsAll(toSet(request))); + assertTrue(toSet(request).containsAll(actual)); + assertTrue(actual.containsAll(toSet(request))); } else { - assertEquals(request.length, actual.length); - for (int i = 0; i < request.length; i++) { - assertHostEquals(request[i], actual[i]); + assertEquals(request.length, actual.size()); + int i = 0; + for (String host : actual) { + assertHostEquals(request[i++], host); } } } diff --git a/client-tests/src/test/java/ru/hh/jclient/common/balancing/AbstractBalancingStrategyTest.java b/client-tests/src/test/java/ru/hh/jclient/common/balancing/AbstractBalancingStrategyTest.java index c4adb241..316c1e99 100755 --- a/client-tests/src/test/java/ru/hh/jclient/common/balancing/AbstractBalancingStrategyTest.java +++ b/client-tests/src/test/java/ru/hh/jclient/common/balancing/AbstractBalancingStrategyTest.java @@ -198,12 +198,12 @@ public TestStoreFromAddress(String datacenterName, Map> ad } @Override - public List getServers(String serviceName) { + public Set getServers(String upstreamName) { return adressesByWeight .entrySet() .stream() .flatMap(entry -> entry.getValue().stream().map(address -> new Server(address, null, entry.getKey(), datacenterName))) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } @Override diff --git a/client-tests/src/test/java/ru/hh/jclient/common/balancing/BalancingConcurrencyIntegrationTest.java b/client-tests/src/test/java/ru/hh/jclient/common/balancing/BalancingConcurrencyIntegrationTest.java index 61f20ba7..0c50e522 100644 --- a/client-tests/src/test/java/ru/hh/jclient/common/balancing/BalancingConcurrencyIntegrationTest.java +++ b/client-tests/src/test/java/ru/hh/jclient/common/balancing/BalancingConcurrencyIntegrationTest.java @@ -6,6 +6,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -101,7 +102,7 @@ public void testStat() throws InterruptedException { }) .collect(Collectors.toList()) ); - List servers = serverStore.getServers(TEST_UPSTREAM); + Set servers = serverStore.getServers(TEST_UPSTREAM); int minWeight = servers.stream().mapToInt(Server::getWeight).min().getAsInt(); int sumWeight = servers.stream().mapToInt(Server::getWeight).sum(); diff --git a/client-tests/src/test/java/ru/hh/jclient/consul/UpstreamServiceImplTest.java b/client-tests/src/test/java/ru/hh/jclient/consul/UpstreamServiceImplTest.java index d33752d1..997064e0 100644 --- a/client-tests/src/test/java/ru/hh/jclient/consul/UpstreamServiceImplTest.java +++ b/client-tests/src/test/java/ru/hh/jclient/consul/UpstreamServiceImplTest.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -104,8 +105,8 @@ private void mockServiceHealth(List health) { @Test public void testParseServer() { - String address1 = "a1"; - String address2 = "a2"; + String address1 = "192.168.1.1"; + String address2 = "192.168.1.2"; int weight = 12; int port1 = 124; int port2 = 126; @@ -119,7 +120,7 @@ public void testParseServer() { upstreamService.updateUpstreams(upstreams, SERVICE_NAME, DATA_CENTER); - List servers = serverStore.getServers(SERVICE_NAME); + Set servers = serverStore.getServers(SERVICE_NAME); assertEquals(2, servers.size()); Map serverMap = servers.stream().collect(Collectors.toMap(Server::getAddress, Function.identity())); @@ -136,44 +137,50 @@ public void testParseServer() { @Test public void testUpdateReplacedServers() { - - String address1 = "a1"; - String address2 = "a2"; - String address3 = "a3"; + String address1 = "192.168.1.1"; + String address2 = "192.168.1.2"; + String address3 = "192.168.1.3"; int weight = 12; int port1 = 124; int port2 = 126; int port3 = 127; - ServiceHealth serviceHealth = buildServiceHealth(address1, port1, DATA_CENTER, NODE_NAME, weight, true); + ServiceHealth serviceHealth1 = buildServiceHealth(address1, port1, DATA_CENTER, NODE_NAME, weight, true); ServiceHealth serviceHealth2 = buildServiceHealth(address2, port2, DATA_CENTER, NODE_NAME, weight, true); ServiceHealth serviceHealth3 = buildServiceHealth(address3, port3, DATA_CENTER, NODE_NAME, weight, true); - Map upstreams = new HashMap<>(); - upstreams.put(buildKey(address1), serviceHealth); - upstreams.put(buildKey(address2), serviceHealth2); + List serverAddresses = List.of(address1, address2); + Map upstreams = Map.of( + buildKey(address1), serviceHealth1, + buildKey(address2), serviceHealth2 + ); + // Initialize current upstreams with first and second servers upstreamService.updateUpstreams(upstreams, SERVICE_NAME, DATA_CENTER); - List servers = serverStore.getServers(SERVICE_NAME); - assertEquals(2, servers.size()); + List servers = serverStore.getServers(SERVICE_NAME).stream().map(Server::getAddress).toList(); + assertEquals(serverAddresses.size(), servers.size()); + assertTrue(servers.contains(Server.addressFromHostPort(address1, port1))); + assertTrue(servers.contains(Server.addressFromHostPort(address2, port2))); + + List updatedServerAddresses = List.of(address3); + Map updatedUpstreams = Map.of(buildKey(address3), serviceHealth3); - upstreamService.updateUpstreams(Map.of(buildKey(address3), serviceHealth3), SERVICE_NAME, DATA_CENTER); - List updatedServers = serverStore.getServers(SERVICE_NAME); + // Replace upstreams with third and the only server + upstreamService.updateUpstreams(updatedUpstreams, SERVICE_NAME, DATA_CENTER); - assertEquals(1, updatedServers.size()); - Server server = updatedServers.get(0); - assertEquals(Server.addressFromHostPort(address3, port3), server.getAddress()); - assertEquals(weight, server.getWeight()); - assertEquals(DATA_CENTER, server.getDatacenter()); + List updatedServers = serverStore.getServers(SERVICE_NAME).stream().map(Server::getAddress).toList(); + assertEquals(updatedServerAddresses.size(), updatedServers.size()); + assertTrue(updatedServers.contains(Server.addressFromHostPort(address3, port3))); - Server server2 = servers.get(1); - assertEquals(Server.addressFromHostPort(address2, port2), server2.getAddress()); + Server server3 = serverStore.getServers(SERVICE_NAME).stream().findFirst().orElseThrow(); + assertEquals(weight, server3.getWeight()); + assertEquals(DATA_CENTER, server3.getDatacenter()); } @Test public void testSameNode() { - String address1 = "a1"; + String address1 = "192.168.1.1"; int weight = 12; int port1 = 124; UpstreamServiceConsulConfig consulConfig = new UpstreamServiceConsulConfig() @@ -187,7 +194,7 @@ public void testSameNode() { ServiceHealth serviceHealth = buildServiceHealth(address1, port1, DATA_CENTER, NODE_NAME, weight, true); mockServiceHealth(List.of(serviceHealth)); - List servers = new UpstreamServiceImpl(infrastructureConfig, consulClient, serverStore, upstreamManager, consulConfig, List.of()) + Set servers = new UpstreamServiceImpl(infrastructureConfig, consulClient, serverStore, upstreamManager, consulConfig, List.of()) .getUpstreamStore() .getServers(SERVICE_NAME); assertEquals(1, servers.size()); @@ -196,8 +203,8 @@ public void testSameNode() { @Test public void testDifferentNodesInTest() { - String address1 = "a1"; - String address2 = "a2"; + String address1 = "192.168.1.1"; + String address2 = "192.168.1.2"; int weight = 12; int port1 = 124; int port2 = 126; @@ -214,7 +221,7 @@ public void testDifferentNodesInTest() { ServiceHealth serviceHealth2 = buildServiceHealth(address2, port2, DATA_CENTER, "differentNode", weight, true); mockServiceHealth(List.of(serviceHealth, serviceHealth2)); - List servers = new UpstreamServiceImpl(infrastructureConfig, consulClient, serverStore, upstreamManager, consulConfig, List.of()) + Set servers = new UpstreamServiceImpl(infrastructureConfig, consulClient, serverStore, upstreamManager, consulConfig, List.of()) .getUpstreamStore() .getServers(SERVICE_NAME); assertEquals(1, servers.size()); @@ -222,7 +229,7 @@ public void testDifferentNodesInTest() { @Test public void testConcurrentUpdateServers() throws ExecutionException, InterruptedException { - List addresses = List.of("a1", "a2", "a3"); + List addresses = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3"); List dcS = IntStream.range(0, 1000).boxed().map(String::valueOf).collect(Collectors.toList()); int weight = 12; @@ -246,14 +253,14 @@ public void testConcurrentUpdateServers() throws ExecutionException, Interrupted CompletableFuture.allOf(updatedUpstream).get(); - List servers = serverStore.getServers(SERVICE_NAME); + Set servers = serverStore.getServers(SERVICE_NAME); assertEquals(addresses.size() * dcS.size(), servers.size()); } @Test public void testDifferentNodesInProd() { - String address1 = "a1"; - String address2 = "a2"; + String address1 = "192.168.1.1"; + String address2 = "192.168.1.2"; int weight = 12; int port1 = 124; int port2 = 126; @@ -270,7 +277,7 @@ public void testDifferentNodesInProd() { ServiceHealth serviceHealth2 = buildServiceHealth(address2, port2, DATA_CENTER, "differentNode", weight, true); mockServiceHealth(List.of(serviceHealth, serviceHealth2)); - List servers = new UpstreamServiceImpl(infrastructureConfig, consulClient, serverStore, upstreamManager, consulConfig, List.of()) + Set servers = new UpstreamServiceImpl(infrastructureConfig, consulClient, serverStore, upstreamManager, consulConfig, List.of()) .getUpstreamStore() .getServers(SERVICE_NAME); assertEquals(2, servers.size()); @@ -315,7 +322,7 @@ public void testNoServers() { @Test public void testNoServersInCurrentDc() { - ServiceHealth serviceHealth = buildServiceHealth("a1", 1, "notCurrentDc", NODE_NAME, 100, true); + ServiceHealth serviceHealth = buildServiceHealth("192.168.1.1", 1, "notCurrentDc", NODE_NAME, 100, true); mockServiceHealth(List.of(serviceHealth)); UpstreamServiceConsulConfig consulConfig = new UpstreamServiceConsulConfig() @@ -342,7 +349,7 @@ public void testNoServersInCurrentDc() { .size() ); - serviceHealth = buildServiceHealth("a2", 1, DATA_CENTER.toLowerCase(), NODE_NAME, 100, true); + serviceHealth = buildServiceHealth("192.168.1.2", 1, DATA_CENTER.toLowerCase(), NODE_NAME, 100, true); mockServiceHealth(List.of(serviceHealth)); assertEquals( 1,