From 2cdb85e001e8df6465c9d0045b14c3bbdc111d87 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Tue, 24 Dec 2024 16:23:56 +0800 Subject: [PATCH] Enhance the ability to forward requests from Meta to the main Meta --- .../bootstrap/MetaServerConfiguration.java | 48 ++--- .../resource/DataInfoIDBlacklistResource.java | 34 +++- .../resource/filter/LeaderForwardFilter.java | 166 ++++++++++++++++++ .../filter/LeaderForwardRestController.java | 16 ++ .../filter/LeaderForwardFilterTest.java | 126 +++++++++++++ 5 files changed, 346 insertions(+), 44 deletions(-) create mode 100644 server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilter.java create mode 100644 server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardRestController.java create mode 100644 server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilterTest.java diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java index c0bdc4b6d..a1ef1dcf4 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java @@ -22,51 +22,25 @@ import com.alipay.sofa.registry.remoting.bolt.exchange.BoltExchange; import com.alipay.sofa.registry.remoting.exchange.Exchange; import com.alipay.sofa.registry.remoting.jersey.exchange.JerseyExchange; -import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig; -import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfigBean; -import com.alipay.sofa.registry.server.meta.bootstrap.config.MultiClusterMetaServerConfig; -import com.alipay.sofa.registry.server.meta.bootstrap.config.NodeConfig; -import com.alipay.sofa.registry.server.meta.bootstrap.config.NodeConfigBeanProperty; +import com.alipay.sofa.registry.server.meta.bootstrap.config.*; import com.alipay.sofa.registry.server.meta.cleaner.AppRevisionCleaner; import com.alipay.sofa.registry.server.meta.cleaner.InterfaceAppsIndexCleaner; import com.alipay.sofa.registry.server.meta.lease.filter.DefaultForbiddenServerManager; import com.alipay.sofa.registry.server.meta.lease.filter.RegistryForbiddenServerManager; -import com.alipay.sofa.registry.server.meta.provide.data.DefaultClientManagerService; -import com.alipay.sofa.registry.server.meta.provide.data.DefaultProvideDataService; -import com.alipay.sofa.registry.server.meta.provide.data.FetchStopPushService; -import com.alipay.sofa.registry.server.meta.provide.data.NodeOperatingService; -import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService; +import com.alipay.sofa.registry.server.meta.provide.data.*; import com.alipay.sofa.registry.server.meta.remoting.DataNodeExchanger; import com.alipay.sofa.registry.server.meta.remoting.MetaServerExchanger; import com.alipay.sofa.registry.server.meta.remoting.SessionNodeExchanger; import com.alipay.sofa.registry.server.meta.remoting.connection.DataConnectionManager; import com.alipay.sofa.registry.server.meta.remoting.connection.MetaConnectionManager; import com.alipay.sofa.registry.server.meta.remoting.connection.SessionConnectionManager; -import com.alipay.sofa.registry.server.meta.remoting.handler.FetchProvideDataRequestHandler; -import com.alipay.sofa.registry.server.meta.remoting.handler.FetchSystemPropertyRequestHandler; -import com.alipay.sofa.registry.server.meta.remoting.handler.GetSlotTableStatusRequestHandler; -import com.alipay.sofa.registry.server.meta.remoting.handler.HeartbeatRequestHandler; -import com.alipay.sofa.registry.server.meta.remoting.handler.RegistryForbiddenServerHandler; +import com.alipay.sofa.registry.server.meta.remoting.handler.*; import com.alipay.sofa.registry.server.meta.remoting.meta.LocalMetaExchanger; import com.alipay.sofa.registry.server.meta.remoting.meta.MetaServerRenewService; -import com.alipay.sofa.registry.server.meta.resource.BlacklistDataResource; -import com.alipay.sofa.registry.server.meta.resource.CircuitBreakerResources; -import com.alipay.sofa.registry.server.meta.resource.ClientManagerResource; -import com.alipay.sofa.registry.server.meta.resource.CompressResource; -import com.alipay.sofa.registry.server.meta.resource.DataInfoIDBlacklistResource; -import com.alipay.sofa.registry.server.meta.resource.HealthResource; -import com.alipay.sofa.registry.server.meta.resource.MetaCenterResource; -import com.alipay.sofa.registry.server.meta.resource.MetaDigestResource; -import com.alipay.sofa.registry.server.meta.resource.MetaLeaderResource; -import com.alipay.sofa.registry.server.meta.resource.ProvideDataResource; -import com.alipay.sofa.registry.server.meta.resource.RecoverConfigResource; -import com.alipay.sofa.registry.server.meta.resource.RegistryCoreOpsResource; -import com.alipay.sofa.registry.server.meta.resource.ShutdownSwitchResource; -import com.alipay.sofa.registry.server.meta.resource.SlotSyncResource; -import com.alipay.sofa.registry.server.meta.resource.SlotTableResource; -import com.alipay.sofa.registry.server.meta.resource.StopPushDataResource; +import com.alipay.sofa.registry.server.meta.resource.*; import com.alipay.sofa.registry.server.meta.resource.filter.AuthRestFilter; import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareFilter; +import com.alipay.sofa.registry.server.meta.resource.filter.LeaderForwardFilter; import com.alipay.sofa.registry.server.meta.slot.status.SlotTableStatusService; import com.alipay.sofa.registry.server.shared.config.CommonConfig; import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler; @@ -89,12 +63,7 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @author shangyu.wh @@ -338,6 +307,11 @@ public LeaderAwareFilter leaderAwareFilter() { return new LeaderAwareFilter(); } + @Bean + public LeaderForwardFilter leaderForwardFilter() { + return new LeaderForwardFilter(); + } + @Bean public AuthRestFilter authRestFilter() { return new AuthRestFilter(); diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataInfoIDBlacklistResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataInfoIDBlacklistResource.java index ab0903445..1326cc6a1 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataInfoIDBlacklistResource.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataInfoIDBlacklistResource.java @@ -12,7 +12,7 @@ import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataNotifier; import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService; import com.alipay.sofa.registry.server.meta.resource.filter.AuthRestController; -import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareRestController; +import com.alipay.sofa.registry.server.meta.resource.filter.LeaderForwardRestController; import com.alipay.sofa.registry.store.api.DBResponse; import com.alipay.sofa.registry.store.api.OperationStatus; import com.alipay.sofa.registry.util.JsonUtils; @@ -20,10 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import org.springframework.beans.factory.annotation.Autowired; -import javax.ws.rs.FormParam; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; +import javax.ws.rs.*; import javax.ws.rs.core.MediaType; import java.util.HashSet; import java.util.Set; @@ -33,8 +30,7 @@ * @date 2024/12/13 */ @Path("datainfoid/blacklist") -@AuthRestController -@LeaderAwareRestController +@LeaderForwardRestController public class DataInfoIDBlacklistResource { private static final Logger LOGGER = LoggerFactory.getLogger(DataInfoIDBlacklistResource.class); @@ -47,6 +43,7 @@ public class DataInfoIDBlacklistResource { @POST @Path("add") @Produces(MediaType.APPLICATION_JSON) + @AuthRestController public Result addBlackList(@FormParam("dataId") String dataId, @FormParam("group") String group, @FormParam("instanceId") String instanceId) { @@ -61,6 +58,7 @@ public Result addBlackList(@FormParam("dataId") String dataId, @POST @Path("delete") @Produces(MediaType.APPLICATION_JSON) + @AuthRestController public Result deleteBlackList(@FormParam("dataId") String dataId, @FormParam("group") String group, @FormParam("instanceId") String instanceId) { @@ -72,6 +70,28 @@ public Result deleteBlackList(@FormParam("dataId") String dataId, } } + @GET + @Path("query") + @Produces(MediaType.APPLICATION_JSON) + public Result queryBlackList() { + try { + DBResponse queryResponse = + this.provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID); + OperationStatus operationStatus = queryResponse.getOperationStatus(); + if (OperationStatus.SUCCESS.equals(operationStatus)) { + PersistenceData persistenceData = queryResponse.getEntity(); + Result result = Result.success(); + result.setMessage(persistenceData.getData()); + return result; + } else { + return Result.success(); + } + } catch (Throwable throwable) { + LOGGER.error("Query dataid black list exception", throwable); + return Result.failed("Query dataid black list exception"); + } + } + private Result process(String dataId, String group, String instanceId, Operation operation) { // 1. 参数检查 // 检查要处理的 DataId 以及 Group 是否符合规则 diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilter.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilter.java new file mode 100644 index 000000000..4dd743f0c --- /dev/null +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilter.java @@ -0,0 +1,166 @@ +package com.alipay.sofa.registry.server.meta.resource.filter; + +import com.alipay.sofa.registry.log.Logger; +import com.alipay.sofa.registry.log.LoggerFactory; +import com.alipay.sofa.registry.server.meta.MetaLeaderService; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.eclipse.jetty.http.HttpStatus; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.annotation.Priority; +import javax.ws.rs.Priorities; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import javax.ws.rs.ext.Provider; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author huicha + * @date 2024/12/24 + */ +@Provider +@LeaderForwardRestController +@Priority(Priorities.USER) +public class LeaderForwardFilter implements ContainerRequestFilter { + + private Logger LOGGER = LoggerFactory.getLogger(LeaderForwardFilter.class, "[LeaderForwardFilter]"); + + @Autowired + private MetaLeaderService metaLeaderService; + + @Override + public void filter(ContainerRequestContext requestContext) throws IOException { + if (metaLeaderService.amILeader()) { + return; + } + + String leaderAddr = metaLeaderService.getLeader(); + if (StringUtils.isBlank(leaderAddr)) { + Response response = + Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .header("reason", "no leader found") + .build(); + requestContext.abortWith(response); + return; + } + + this.proxyRequestToMetaLeader(requestContext, leaderAddr); + } + + /** + * 当前 Meta 不是 Leader,将请求转发到 Meta Leader 中 + * 这里没考虑直接使用 Http Client 是因为使用到这个 Filter 的基本都是配置下发链路 + * 属于旁路,不需要链接池、内存池等各种资源,因此就不想引入其他依赖了,只使用了 Java 原生的 + * 方法,链接也是使用到的时候再主动去链 + */ + private void proxyRequestToMetaLeader(ContainerRequestContext requestContext, String leaderAddr) { + HttpURLConnection connection = null; + try { + // 1. 获取请求信息 + String method = requestContext.getMethod(); + boolean hasEntity = requestContext.hasEntity(); + MultivaluedMap headers = requestContext.getHeaders(); + UriInfo uriInfo = requestContext.getUriInfo(); + URI requestURI = uriInfo.getAbsolutePath(); + int requestPort = requestURI.getPort(); + String requestPath = requestURI.getRawPath(); + + // 2. 拼接发送给 Meta Leader 的请求地址 + String newRequestURLStr = String.format("http://%s:%d%s", leaderAddr, requestPort, requestPath); + URL newRequestURL = new URL(newRequestURLStr); + + // 3. 打开链接,这里因为协议写死是 HTTP 协议,所以拿到的必然是 HttpURLConnection + connection = (HttpURLConnection) newRequestURL.openConnection(); + connection.setRequestMethod(method); + if (hasEntity) { + connection.setDoOutput(true); + } + connection.setDoInput(true); + + // 设置超时时间 + connection.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(1)); + connection.setReadTimeout((int) TimeUnit.SECONDS.toMillis(3)); + + // 设置请求头 + for (MultivaluedMap.Entry> entry : headers.entrySet()) { + String headerKey = entry.getKey(); + List headerValues = entry.getValue(); + if (CollectionUtils.isNotEmpty(headerValues)) { + connection.addRequestProperty(headerKey, headerValues.get(0)); + } + } + + connection.connect(); + + // 4. 发送请求,直接做一次拷贝 + if (hasEntity) { + try ( + OutputStream outputStream = connection.getOutputStream(); + InputStream inputStream = requestContext.getEntityStream() + ) { + IOUtils.copy(inputStream, outputStream); + } + } + + // 5. 接收响应 + int responseCode = connection.getResponseCode(); + if (responseCode != HttpStatus.OK_200) { + LOGGER.error("Proxy request to meta leader fail, response code: {}, message: {}", responseCode, connection.getResponseMessage()); + Response response = + Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .header("reason", "proxy request to meta leader fail: " + connection.getResponseMessage()) + .build(); + requestContext.abortWith(response); + return; + } + + // 读取数据 + try ( + InputStream inputStream = connection.getInputStream(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024); + ) { + IOUtils.copy(inputStream, outputStream); + byte[] responseData = outputStream.toByteArray(); + Response response = Response.ok(responseData).build(); + requestContext.abortWith(response); + } + } catch (Throwable throwable) { + LOGGER.error("Proxy request to meta leader exception, meta leader address: {}", leaderAddr, throwable); + Response response = + Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .header("reason", "proxy request to meta leader exception") + .build(); + requestContext.abortWith(response); + } finally { + if (null != connection) { + try { + connection.disconnect(); + } catch (Throwable throwable) { + // 吃掉异常 + LOGGER.error("Disconnect connection to meta leader fail, meta leader address: {}", leaderAddr, throwable); + } + } + } + } + + @VisibleForTesting + public LeaderForwardFilter setMetaLeaderService(MetaLeaderService metaLeaderService) { + this.metaLeaderService = metaLeaderService; + return this; + } + +} diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardRestController.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardRestController.java new file mode 100644 index 000000000..0c14d4205 --- /dev/null +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardRestController.java @@ -0,0 +1,16 @@ +package com.alipay.sofa.registry.server.meta.resource.filter; + +import javax.ws.rs.NameBinding; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author huicha + * @date 2024/12/24 + */ +@NameBinding +@Target({ElementType.TYPE, ElementType.METHOD}) +@Retention(value = RetentionPolicy.RUNTIME) +public @interface LeaderForwardRestController {} diff --git a/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilterTest.java b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilterTest.java new file mode 100644 index 000000000..07c880690 --- /dev/null +++ b/server/server/meta/src/test/java/com/alipay/sofa/registry/server/meta/resource/filter/LeaderForwardFilterTest.java @@ -0,0 +1,126 @@ +package com.alipay.sofa.registry.server.meta.resource.filter; + +import com.alipay.sofa.registry.common.model.store.DataInfo; +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.core.model.Result; +import com.alipay.sofa.registry.remoting.jersey.JerseyClient; +import com.alipay.sofa.registry.server.meta.AbstractH2DbTestBase; +import com.alipay.sofa.registry.server.meta.MetaLeaderService; +import com.alipay.sofa.registry.util.JsonUtils; +import com.fasterxml.jackson.core.type.TypeReference; +import org.eclipse.jetty.http.HttpStatus; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Form; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author huicha + * @date 2024/12/24 + */ +public class LeaderForwardFilterTest extends AbstractH2DbTestBase { + + @Autowired + private LeaderForwardFilter leaderForwardFilter; + + @Test + public void test() { + AmILeaderAnswer amILeaderAnswer = new AmILeaderAnswer(); + MetaLeaderService metaLeaderService = mock(MetaLeaderService.class); + when(metaLeaderService.amILeader()).then(amILeaderAnswer); + when(metaLeaderService.getLeader()).thenReturn("127.0.0.1"); + this.leaderForwardFilter.setMetaLeaderService(metaLeaderService); + + // 1. 首先请求一次,不触发转发,拿到正常的响应结果 + amILeaderAnswer.setFirstTime(false); + Result firstResult = null; + try (Response firstResponse = this.sendAddBlackListRequest()) { + Assert.assertEquals(HttpStatus.OK_200, firstResponse.getStatus()); + firstResult = firstResponse.readEntity(Result.class); + } + + // 2. 然后再请求一次,触发转发,对比两次请求的结果,这两次请求结果必须一致 + amILeaderAnswer.setFirstTime(true); + + try (Response secondResponse = this.sendAddBlackListRequest()) { + Assert.assertEquals(HttpStatus.OK_200, secondResponse.getStatus()); + Result secondResult = secondResponse.readEntity(Result.class); + + Assert.assertEquals(firstResult.isSuccess(), secondResult.isSuccess()); + Assert.assertEquals(firstResult.getMessage(), secondResult.getMessage()); + } + + // 3. 再测试一个 Get 的例子,由于前面成功添加了一个黑名单,因此后面这里一定能查询到这个添加的数据 + amILeaderAnswer.setFirstTime(true); + try (Response getResponse = this.sendQueryBlackListRequest()) { + Assert.assertEquals(HttpStatus.OK_200, getResponse.getStatus()); + Result getResult = getResponse.readEntity(Result.class); + + Assert.assertTrue(getResult.isSuccess()); + + String blackListJson = getResult.getMessage(); + Set blackList = JsonUtils.read(blackListJson, new TypeReference>() {}); + Assert.assertEquals(1, blackList.size()); + + DataInfo dataInfo = new DataInfo("test-instance-id", "test-data-id", "test-group"); + Assert.assertTrue(blackList.contains(dataInfo.getDataInfoId())); + } + } + + private Response sendQueryBlackListRequest() { + return JerseyClient.getInstance() + .connect(new URL("127.0.0.1", 9615)) + .getWebTarget() + .path("datainfoid/blacklist/query") + .request(MediaType.APPLICATION_JSON_TYPE) + .get(); + } + + private Response sendAddBlackListRequest() { + Form form = new Form(); + form.param("dataId", "test-data-id"); + form.param("group", "test-group"); + form.param("instanceId", "test-instance-id"); + + return JerseyClient.getInstance() + .connect(new URL("127.0.0.1", 9615)) + .getWebTarget() + .path("datainfoid/blacklist/add") + .request(MediaType.APPLICATION_JSON_TYPE) + .buildPost(Entity.form(form)) + .invoke(); + } +} + +class AmILeaderAnswer implements Answer { + + // 单测,这里不考虑并发问题 + private boolean firstTime = true; + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + if (!firstTime) { + // 不是第一次请求就返回是 Leader,请求不做代理 + return true; + } else { + // 第一次请求,返回不是 Leader,触发请求代理 + this.firstTime = false; + return false; + } + } + + public void setFirstTime(boolean firstTime) { + this.firstTime = firstTime; + } + +} \ No newline at end of file