Skip to content

Commit

Permalink
fix:优化ConsumerAPI实例数量,减少相关资源重复注册 (#65)
Browse files Browse the repository at this point in the history
* chroe:update polaris-java dep version to 1.15.4

* fix:优化ConsumerAPI实例数量,减少相关资源重复注册
  • Loading branch information
chuntaojun authored May 11, 2024
1 parent 460ca58 commit 3e40867
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private List<URL> toUrlWithEmpty(URL providerUrl, List<URL> urls) {
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
LOGGER.info("[polaris] unsubscribe service: {}", url.toString());
ServiceListener serviceListener = serviceListeners.get(listener);
ServiceListener serviceListener = serviceListeners.get(url);
if (null != serviceListener) {
polarisOperator.unwatchService(url.getServiceInterface(), serviceListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.utils.CollectionUtils;
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.FailbackRegistry;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.cluster.RouterFactory;
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.pojo.Instance;
Expand All @@ -34,7 +31,6 @@
import com.tencent.polaris.common.registry.ConvertUtils;
import com.tencent.polaris.common.registry.PolarisOperator;
import com.tencent.polaris.common.registry.PolarisOperators;
import com.tencent.polaris.common.utils.ExtensionConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.tencent.polaris.common.registry;

import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.core.ProviderAPI;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.factory.CircuitBreakAPIFactory;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.factory.ConfigAPIFactory;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.factory.api.RouterAPIFactory;
import com.tencent.polaris.factory.config.ConfigurationImpl;
import com.tencent.polaris.ratelimit.api.core.LimitAPI;
import com.tencent.polaris.ratelimit.factory.LimitAPIFactory;
import com.tencent.polaris.router.api.core.RouterAPI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;

public class PolarisClient {

private static final Logger LOGGER = LoggerFactory.getLogger(PolarisClient.class);

private final SDKContext sdkContext;

private final ConsumerAPI consumerAPI;

private final ProviderAPI providerAPI;

private final LimitAPI limitAPI;

private final RouterAPI routerAPI;

private final CircuitBreakAPI circuitBreakAPI;

public PolarisClient(String registryAddress, String configAddress) {
ConfigurationImpl configuration = (ConfigurationImpl) ConfigAPIFactory.defaultConfig();
configuration.setDefault();
configuration.getGlobal().getServerConnector()
.setAddresses(Collections.singletonList(registryAddress));
configuration.getConfigFile().getServerConnector()
.setAddresses(Collections.singletonList(configAddress));
configuration.getConsumer().getLocalCache().setPersistEnable(true);
sdkContext = SDKContext.initContextByConfig(configuration);
consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
providerAPI = DiscoveryAPIFactory.createProviderAPIByContext(sdkContext);
limitAPI = LimitAPIFactory.createLimitAPIByContext(sdkContext);
routerAPI = RouterAPIFactory.createRouterAPIByContext(sdkContext);
circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByContext(sdkContext);
}

public void destroy() {
sdkContext.close();
}

public ConsumerAPI getConsumerAPI() {
return consumerAPI;
}

public ProviderAPI getProviderAPI() {
return providerAPI;
}

public LimitAPI getLimitAPI() {
return limitAPI;
}

public RouterAPI getRouterAPI() {
return routerAPI;
}

public CircuitBreakAPI getCircuitBreakAPI() {
return circuitBreakAPI;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public PolarisConfig(String host, int port, Map<String, String> parameters) {
configAddress = String.format("%s:%d", host, Consts.CONFIG_PORT);

String namespaceStr = parameters.get(Consts.KEY_NAMESPACE);
if (null == namespaceStr || namespaceStr.length() == 0) {
if (null == namespaceStr || namespaceStr.isEmpty()) {
namespaceStr = Consts.DEFAULT_NAMESPACE;
}
this.namespace = namespaceStr;
this.token = parameters.get(Consts.KEY_TOKEN);
int healthTTL = Consts.DEFAULT_TTL;
String ttlStr = System.getProperty(Consts.KEY_TTL);
if (null != ttlStr && ttlStr.length() > 0) {
if (null != ttlStr && !ttlStr.isEmpty()) {
try {
healthTTL = Integer.parseInt(ttlStr);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.core.ProviderAPI;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.plugin.circuitbreaker.ResourceStat;
import com.tencent.polaris.api.pojo.DefaultServiceInstances;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.RetStatus;
Expand All @@ -43,19 +42,12 @@
import com.tencent.polaris.api.rpc.UnWatchServiceRequest;
import com.tencent.polaris.api.rpc.WatchServiceRequest;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.factory.CircuitBreakAPIFactory;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.configuration.api.core.ConfigFileService;
import com.tencent.polaris.configuration.factory.ConfigFileServiceFactory;
import com.tencent.polaris.factory.ConfigAPIFactory;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.factory.api.RouterAPIFactory;
import com.tencent.polaris.factory.config.ConfigurationImpl;
import com.tencent.polaris.ratelimit.api.core.LimitAPI;
import com.tencent.polaris.ratelimit.api.rpc.Argument;
import com.tencent.polaris.ratelimit.api.rpc.QuotaRequest;
import com.tencent.polaris.ratelimit.api.rpc.QuotaResponse;
import com.tencent.polaris.ratelimit.factory.LimitAPIFactory;
import com.tencent.polaris.router.api.core.RouterAPI;
import com.tencent.polaris.router.api.rpc.ProcessLoadBalanceRequest;
import com.tencent.polaris.router.api.rpc.ProcessLoadBalanceResponse;
Expand All @@ -65,31 +57,30 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.polaris.com.google.protobuf.Message;

public class PolarisOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(PolarisOperator.class);

private final PolarisConfig polarisConfig;

private SDKContext sdkContext;
private final PolarisClient polarisClient;

private ConsumerAPI consumerAPI;

private ProviderAPI providerAPI;

private LimitAPI limitAPI;

private RouterAPI routerAPI;

private CircuitBreakAPI circuitBreakAPI;

public PolarisOperator(String host, int port, Map<String, String> parameters, BootConfigHandler... handlers) {
polarisConfig = new PolarisConfig(host, port, parameters);
/**
* 北极星操作的封装,由于北极星 SDK 本身是支持跨 namespace 操作的,因此只需要保证每一个北极星服务端接入地址中的 host+port 映射一个唯一的 PolarisClient 中,
* 可减少北极星 SDK 实例的数量
*
* @param polarisClient {@link PolarisClient}
* @param host 注册中心服务端IP
* @param port 注册中心服务端端口
* @param parameters URL 额外参数信息
* @param handlers 解析 URL 参数信息,并设置到 PolarisJava 的 SDKContext 中
*/
public PolarisOperator(PolarisClient polarisClient, String host, int port, Map<String, String> parameters, BootConfigHandler... handlers) {
this.polarisClient = polarisClient;
this.polarisConfig = new PolarisConfig(host, port, parameters);
init(parameters, handlers);
}

Expand All @@ -100,21 +91,14 @@ private void init(Map<String, String> parameters, BootConfigHandler... handlers)
.setAddresses(Collections.singletonList(polarisConfig.getRegistryAddress()));
configuration.getConfigFile().getServerConnector()
.setAddresses(Collections.singletonList(polarisConfig.getConfigAddress()));
if (null != handlers && handlers.length > 0) {
if (null != handlers) {
for (BootConfigHandler bootConfigHandler : handlers) {
bootConfigHandler.handle(parameters, configuration);
}
}
sdkContext = SDKContext.initContextByConfig(configuration);
consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
providerAPI = DiscoveryAPIFactory.createProviderAPIByContext(sdkContext);
limitAPI = LimitAPIFactory.createLimitAPIByContext(sdkContext);
routerAPI = RouterAPIFactory.createRouterAPIByContext(sdkContext);
circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByContext(sdkContext);
}

public void destroy() {
sdkContext.close();
}

/**
Expand All @@ -139,7 +123,7 @@ public void register(String service, String host, int port, String protocol, Str
instanceRegisterRequest.setMetadata(metadata);
instanceRegisterRequest.setProtocol(protocol);
instanceRegisterRequest.setToken(token);
InstanceRegisterResponse response = providerAPI.registerInstance(instanceRegisterRequest);
InstanceRegisterResponse response = getProviderAPI().registerInstance(instanceRegisterRequest);
LOGGER.info("register result is {} for service {}", response, service);
}

Expand All @@ -151,7 +135,7 @@ public void deregister(String service, String host, int port) {
instanceDeregisterRequest.setPort(port);
instanceDeregisterRequest.setHost(host);
instanceDeregisterRequest.setToken(polarisConfig.getToken());
providerAPI.deRegister(instanceDeregisterRequest);
getProviderAPI().deRegister(instanceDeregisterRequest);
LOGGER.info("[POLARIS] deregister service {}", service);
}

Expand All @@ -160,7 +144,7 @@ public boolean watchService(String service, ServiceListener listener) {
watchServiceRequest.setNamespace(polarisConfig.getNamespace());
watchServiceRequest.setService(service);
watchServiceRequest.setListeners(Collections.singletonList(listener));
return consumerAPI.watchService(watchServiceRequest).isSuccess();
return getConsumerAPI().watchService(watchServiceRequest).isSuccess();
}

public void unwatchService(String service, ServiceListener serviceListener) {
Expand All @@ -170,7 +154,7 @@ public void unwatchService(String service, ServiceListener serviceListener) {
.service(service)
.listeners(Collections.singletonList(serviceListener))
.build();
consumerAPI.unWatchService(watchServiceRequest);
getConsumerAPI().unWatchService(watchServiceRequest);
}

/**
Expand All @@ -184,7 +168,7 @@ public Instance[] getAvailableInstances(String service, boolean includeCircuitBr
request.setNamespace(polarisConfig.getNamespace());
request.setService(service);
request.setIncludeCircuitBreakInstances(includeCircuitBreakInstances);
InstancesResponse instances = consumerAPI.getHealthyInstances(request);
InstancesResponse instances = getConsumerAPI().getHealthyInstances(request);
return instances.getInstances();
}

Expand All @@ -205,7 +189,7 @@ public void reportInvokeResult(String service, String method, String host, int p
serviceCallResult.setRetStatus(retStatus);
serviceCallResult.setRetCode(code);
serviceCallResult.setCallerIp(callerIp);
consumerAPI.updateServiceCallResult(serviceCallResult);
getConsumerAPI().updateServiceCallResult(serviceCallResult);
}

public List<Instance> route(String service, String method, Set<RouteArgument> arguments, List<Instance> instances) {
Expand All @@ -217,7 +201,7 @@ public List<Instance> route(String service, String method, Set<RouteArgument> ar
request.setDstInstances(defaultServiceInstances);
request.setMethod(method);
request.setSourceService(serviceInfo);
ProcessRoutersResponse processRoutersResponse = routerAPI.processRouters(request);
ProcessRoutersResponse processRoutersResponse = getRouterAPI().processRouters(request);
return processRoutersResponse.getServiceInstances().getInstances();
}

Expand All @@ -229,7 +213,7 @@ public Instance loadBalance(String service, String hashKey, List<Instance> insta
Criteria criteria = new Criteria();
criteria.setHashKey(hashKey);
processLoadBalanceRequest.setCriteria(criteria);
ProcessLoadBalanceResponse processLoadBalanceResponse = routerAPI.processLoadBalance(processLoadBalanceRequest);
ProcessLoadBalanceResponse processLoadBalanceResponse = getRouterAPI().processLoadBalance(processLoadBalanceRequest);
return processLoadBalanceResponse.getTargetInstance();
}

Expand All @@ -245,46 +229,39 @@ public QuotaResponse getQuota(String service, String method, Set<Argument> argum
quotaRequest.setMethod(method);
quotaRequest.setArguments(arguments);
quotaRequest.setCount(1);
return limitAPI.getQuota(quotaRequest);
return getLimitAPI().getQuota(quotaRequest);
}

public ServiceRule getServiceRule(String service, EventType eventType) {
GetServiceRuleRequest getServiceRuleRequest = new GetServiceRuleRequest();
getServiceRuleRequest.setNamespace(polarisConfig.getNamespace());
getServiceRuleRequest.setService(service);
getServiceRuleRequest.setRuleType(eventType);
ServiceRuleResponse serviceRule = consumerAPI.getServiceRule(getServiceRuleRequest);
ServiceRuleResponse serviceRule = getConsumerAPI().getServiceRule(getServiceRuleRequest);
return serviceRule.getServiceRule();
}

public List<ServiceInfo> getServices() {
GetServicesRequest getServicesRequest = new GetServicesRequest();
getServicesRequest.setNamespace(polarisConfig.getNamespace());
ServicesResponse services = consumerAPI.getServices(getServicesRequest);
return services.getServices();
}

public PolarisConfig getPolarisConfig() {
return polarisConfig;
}

public ConsumerAPI getConsumerAPI() {
return consumerAPI;
return polarisClient.getConsumerAPI();
}

public ProviderAPI getProviderAPI() {
return providerAPI;
return polarisClient.getProviderAPI();
}

public LimitAPI getLimitAPI() {
return limitAPI;
return polarisClient.getLimitAPI();
}

public RouterAPI getRouterAPI() {
return routerAPI;
return polarisClient.getRouterAPI();
}

public CircuitBreakAPI getCircuitBreakAPI() {
return circuitBreakAPI;
return polarisClient.getCircuitBreakAPI();
}
}
Loading

0 comments on commit 3e40867

Please sign in to comment.