polaris-java-agent设计 #116
Replies: 3 comments
-
Beta Was this translation helpful? Give feedback.
-
Spring-Cloud2021插件设计Agent参数初始化agent需要大量参数供执行后续所有操作,其中包含用户注入参数及Spring启动参数: 用户启动注入参数
-Dpolaris.namespace=default -Dpolaris.server.address=host:8091 从Spring上下文获取参数
拦截Spring启动以获取参数上述与Spring Web应用相关的参数在Spring启动时都会保存在WebApplicationContext中,故在WebApplicationContext初始化完成后执行拦截。 具体拦截点在SpringApplication类中拦截refreshContext方法,refreshContext方法会在当前Spring Context初始化好后执行: public ConfigurableApplicationContext run(String... args) {
long startTime = System.nanoTime();
DefaultBootstrapContext bootstrapContext = createBootstrapContext();
ConfigurableApplicationContext context = null;
configureHeadlessProperty();
SpringApplicationRunListeners listeners = getRunListeners(args);
listeners.starting(bootstrapContext, this.mainApplicationClass);
try {
ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
configureIgnoreBeanInfo(environment);
Banner printedBanner = printBanner(environment);
context = createApplicationContext();
context.setApplicationStartup(this.applicationStartup);
prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
// 该处为拦截点!!!
refreshContext(context);
afterRefresh(context, applicationArguments);
Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
if (this.logStartupInfo) {
new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup);
}
listeners.started(context, timeTakenToStartup);
callRunners(context, applicationArguments);
}
catch (Throwable ex) {
handleRunFailure(context, ex, listeners);
throw new IllegalStateException(ex);
}
try {
Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
listeners.ready(context, timeTakenToReady);
}
catch (Throwable ex) {
handleRunFailure(context, ex, null);
throw new IllegalStateException(ex);
}
return context;
} 具体拦截逻辑该方法只有一个context参数,该参数即为所需的WebApplicationContext对象,在该对象中获取Spring Context的Environment后再按照application.yml中配置的key去获取相应属性即可。 该处须注意所有的Spring Context都要走该流程,所以需要筛选出是否为GenericReactiveWebApplicationContext或GenericWebApplicationContext的实例 初始化好Agent参数后将其保存起来供整个Agent生命周期使用,即PolarisAgentProperties对象。 @Override
public void beforeInterceptor(Object target, Object[] args, PolarisAgentProperties agentProperties) {
// check if servlet applicationContext or reactive applicationContext
Object configurableContext = args[0];
if (configurableContext instanceof GenericWebApplicationContext || configurableContext instanceof GenericReactiveWebApplicationContext) {
// log
LogUtils.logTargetFound(target);
// convert to applicationContext, actual AnnotationConfigServletWebApplicationContext or AnnotationConfigReactiveWebServerApplicationContext
ApplicationContext applicationContext = (ApplicationContext) configurableContext;
// reserve application context for agent
SpringContextFactory.setApplicationContext(applicationContext);
// get basic info from applicationContext
port = applicationContext.getEnvironment().getProperty("server.port");
service = applicationContext.getEnvironment().getProperty("spring.application.name");
host = applicationContext.getEnvironment().getProperty("spring.cloud.client.ip-address");
Assert.notNull(port, "the server port can't be null, please check your server config");
Assert.notNull(service, "the application name can't be null, please check your spring config");
logger.info("Polaris service is set with port: {}", port);
logger.info("Polaris service is set with service: {}", service);
logger.info("Polaris service is set with host: {}", host);
// get init info from system
String host = HostUtils.getHost();
String namespace = System.getProperty("polaris.namespace");
String serverAddress = System.getProperty("polaris.server.address");
Assert.notNull(serverAddress, "the polaris server address can't be null, please check your polaris agent parameter");
if (StringUtils.isEmpty(namespace)) {
namespace = "default";
// logger.warn("the input namespace is empty, use default instead");
System.out.println("the input namespace is empty, use default instead");
}
// init polaris config and reserve
PolarisAgentProperties polarisAgentProperties = new PolarisAgentProperties();
polarisAgentProperties.setHost(host);
polarisAgentProperties.setPort(Integer.valueOf(port));
polarisAgentProperties.setProtocol("grpc");
polarisAgentProperties.setNamespace(namespace);
polarisAgentProperties.setService(service);
polarisAgentProperties.setServerAddress(serverAddress);
PolarisAgentPropertiesFactory.setPolarisAgentProperties(polarisAgentProperties);
// init polarisContext and api
PolarisContext polarisContext = new PolarisContext(polarisAgentProperties);
PolarisAPIFactory.init(polarisContext);
}
} 服务注册Spring-cloud服务注册规范为:将服务实例抽象为Registration接口、将服务注册抽象为ServiceRegistry<Registration>接口 整体流程Registration接口包含ServiceId,Host和Port public interface ServiceInstance {
default String getInstanceId() {
return null;
}
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
default String getScheme() {
return null;
}
} ServiceRegistry接口Spring-cloud规范中完成服务注册动作的类: 包含注册、反注册、状态、关闭等操作接口 public interface ServiceRegistry<R extends Registration> {
/**
* Registers the registration. A registration typically has information about an
* instance, such as its hostname and port.
* @param registration registration meta data
*/
void register(R registration);
/**
* Deregisters the registration.
* @param registration registration meta data
*/
void deregister(R registration);
/**
* Closes the ServiceRegistry. This is a lifecycle method.
*/
void close();
/**
* Sets the status of the registration. The status values are determined by the
* individual implementations.
* @param registration The registration to update.
* @param status The status to set.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
void setStatus(R registration, String status);
/**
* Gets the status of a particular registration.
* @param registration The registration to query.
* @param <T> The type of the status.
* @return The status of the registration.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
<T> T getStatus(R registration);
} Polaris服务注册时序图:初始化SDKContext通过Polaris-Api配置Configuration对象,在默认配置的基础之上绑定SpringCloud决策出的IP,为后续服务注册使用: private static Configuration configuration(PolarisAgentProperties polarisAgentProperties) {
ConfigurationImpl configuration = (ConfigurationImpl) ConfigAPIFactory
.defaultConfig(ConfigProvider.DEFAULT_CONFIG);
configuration.setDefault();
configuration.getGlobal().getAPI().setBindIP(PolarisServiceConstants.host);
configuration.getGlobal().getServerConnector().setAddresses(Collections.singletonList(polarisAgentProperties.getServerAddress()));
return configuration;
} // 初始化SDKContext
SDKContext.initContextByConfig(configuration()); 初始化插件(默认配置可进行扩展)通过用户传入的agent参数(必须传入server的地址,用以初始化ServerConnector)或者默认SPI方式,初始化Polaris所须的插件(可选),包括:ServerConnector、Router、HealthChecker、LoadBalancer、RateLimiter、CircuitBreaker等 Configuration.Extensions.init() 初始化ConsumerAPI和ProviderAPI通过SDKContext,初始化Polaris用以服务发现和服务注册的两个API public static void init(PolarisContext polarisContext) {
CONSUMER_API = DiscoveryAPIFactory.createConsumerAPIByContext(polarisContext.getSdkContext());
PROVIDER_API = DiscoveryAPIFactory.createProviderAPIByContext(polarisContext.getSdkContext());
} agent拦截逻辑拦截点原逻辑为在WebServer启动完成后会发布WebServerInitializedEvents事件,然后再被监听从而触发服务注册流程,实则可以在WebServer启动完成时,也就是ApplicationContext完成上下文刷新时进行post拦截,如下所示: 在AbstractApplicationContext类中finishRefresh方法进行拦截,该处也须注意所有的Spring Context都要走该流程,所以需要筛选出是否为GenericReactiveWebApplicationContext或GenericWebApplicationContext的实例 public void refresh() throws BeansException, IllegalStateException {
synchronized(this.startupShutdownMonitor) {
this.prepareRefresh();
ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
this.prepareBeanFactory(beanFactory);
try {
this.postProcessBeanFactory(beanFactory);
this.invokeBeanFactoryPostProcessors(beanFactory);
this.registerBeanPostProcessors(beanFactory);
this.initMessageSource();
this.initApplicationEventMulticaster();
this.onRefresh();
this.registerListeners();
this.finishBeanFactoryInitialization(beanFactory);
this.finishRefresh();
// 执行拦截
} catch (BeansException var9) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var9);
}
this.destroyBeans();
this.cancelRefresh(var9);
throw var9;
} finally {
this.resetCommonCaches();
}
}
} 拦截代码拦截判断: @Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (target instanceof GenericWebApplicationContext || target instanceof GenericReactiveWebApplicationContext) {
LogUtils.logTargetFound(target);
this.afterInterceptor(target, args, result, throwable, PolarisAgentPropertiesFactory.getPolarisAgentProperties());
}
}
@Override
public void afterInterceptor(Object target, Object[] args, Object result, Throwable throwable, PolarisAgentProperties polarisAgentProperties) {
AfterPolarisInterceptor polarisInterceptor = new PolarisRegistryPolarisInterceptor();
polarisInterceptor.afterInterceptor(target, args, result, throwable, polarisAgentProperties);
} 执行register逻辑: @Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for polaris client...");
return;
}
// 注册实例
InstanceRegisterRequest instanceRegisterRequest = new InstanceRegisterRequest();
instanceRegisterRequest.setNamespace(polarisContext.getPolarisContextAgentProperties().getNamespace());
instanceRegisterRequest.setService(registration.getServiceId());
instanceRegisterRequest.setHost(registration.getHost());
instanceRegisterRequest.setPort(registration.getPort());
instanceRegisterRequest.setToken(polarisContext.getPolarisContextAgentProperties().getToken());
if (null != heartbeatExecutor) {
instanceRegisterRequest.setTtl(ttl);
}
// instanceRegisterRequest.setMetadata(metadataLocalProperties.getContent());
instanceRegisterRequest.setProtocol(polarisContext.getPolarisContextAgentProperties().getProtocol());
instanceRegisterRequest.setVersion(polarisContext.getPolarisContextAgentProperties().getVersion());
try {
ProviderAPI providerClient = PolarisAPIFactory.getProviderApi();
providerClient.register(instanceRegisterRequest);
Runtime.getRuntime().addShutdownHook(new Thread(() -> deregister(registration)));
if (null != heartbeatExecutor) {
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest();
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest);
//注册成功后开始启动心跳线程
heartbeat(heartbeatRequest);
}
} catch (Exception e) {
log.error("polaris registry, {} register failed...{},", registration.getServiceId(), registration, e);
rethrowRuntimeException(e);
}
} 服务反注册使用Java提供的 Runtime.getRuntime().addShutdownHook(new Thread(() -> deregister(registration))); @Override
public void deregister(Registration registration) {
log.info("De-registering from Polaris Server now...");
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No dom to de-register for polaris client...");
return;
}
InstanceDeregisterRequest deRegisterRequest = new InstanceDeregisterRequest();
deRegisterRequest.setToken(polarisContext.getPolarisContextAgentProperties().getToken());
deRegisterRequest.setNamespace(polarisContext.getPolarisContextAgentProperties().getNamespace());
deRegisterRequest.setNamespace("default");
deRegisterRequest.setService(registration.getServiceId());
deRegisterRequest.setHost(registration.getHost());
deRegisterRequest.setPort(registration.getPort());
try {
ProviderAPI providerClient = PolarisAPIFactory.getProviderApi();
providerClient.deRegister(deRegisterRequest);
} catch (Exception e) {
log.error("ERR_POLARIS_DEREGISTER, de-register failed...{},", registration, e);
} finally {
if (null != heartbeatExecutor) {
heartbeatExecutor.shutdown();
}
}
log.info("De-registration finished.");
} 服务发现实现spring-cloud规范中的DiscoveryClient接口,并将其插入到Spring-Cloud服务发现DiscoveryClient列表中 整体流程DiscoveryClient接口主要包含getInstances和getServices两个方法 public interface DiscoveryClient extends Ordered {
/**
* Default order of the discovery client.
*/
int DEFAULT_ORDER = 0;
/**
* A human-readable description of the implementation, used in HealthIndicator.
* @return The description.
*/
String description();
/**
* Gets all ServiceInstances associated with a particular serviceId.
* @param serviceId The serviceId to query.
* @return A List of ServiceInstance.
*/
List<ServiceInstance> getInstances(String serviceId);
/**
* @return All known service IDs.
*/
List<String> getServices();
/**
* Default implementation for getting order of discovery clients.
* @return order
*/
@Override
default int getOrder() {
return DEFAULT_ORDER;
}
} Polaris服务发现时序图拦截逻辑拦截CompositeDiscoveryClient类的构造函数,将原DiscoveryClient列表清空,再将PolarisDiscoveryClient添加进去,以执行Polaris的服务发现逻辑 public class CompositeDiscoveryClient implements DiscoveryClient {
private final List<DiscoveryClient> discoveryClients;
public CompositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {}
AnnotationAwareOrderComparator.sort(discoveryClients);
this.discoveryClients = discoveryClients;
// post拦截点 <init>
// 替换为PolarisDiscoveryClient
}
} 服务发现逻辑实现在PolarisDiscoveryClient中实现具体的调用Polaris的on获取服务及实例的逻辑 /**
* 获取服务路由后的实例列表
*
* @param service 服务名
* @return 服务实例列表
*/
public InstancesResponse getFilteredInstances(String service) {
String namespace = polarisAgentProperties.getNamespace();
GetInstancesRequest getInstancesRequest = new GetInstancesRequest();
getInstancesRequest.setNamespace(namespace);
getInstancesRequest.setService(service);
return consumerAPI.getInstances(getInstancesRequest);
} ### 负载均衡 Ribbon经过调研,尽管在新版本中已经逐渐淘汰Ribbon作为负载均衡,但是Ribbon依然在从Spring Cloud G版H版中被作为默认负载均衡器,并且如果在2020.0及以后版本引入,则依然会使用ribbon ribbon特点很多地方不使用Spring-cloud服务注册与发现的规范,而是重新定义接口,例如:
故实现agent时将首先实现这些规范,调用ribbon时将使用Polaris的路由策略返回ServerList,再由指定的负载均衡策略完成调用,主要的关键是替换掉原有负载均衡器,新建如下所示的负载均衡器,通过Polaris的routerAPI进行路由: public PolarisRoutingLoadBalancer(IClientConfig config, IRule rule, IPing ping,
ServerList<Server> serverList, RouterAPI routerAPI,
PolarisAgentProperties polarisAgentProperties) {
super(config, rule, ping, serverList, null, new PollingServerListUpdater());
this.routerAPI = routerAPI;
this.polarisAgentProperties = polarisAgentProperties;
} 拦截点Ribbon会在LoadBalancerContext构造函数中初始化负载均衡器,于是在构造函数结束后将this.lb指向新建的PolarisRoutingLoadBalancer即可: public LoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig) {
this.lb = lb;
initWithNiwsConfig(clientConfig);
// post拦截点
} |
Beta Was this translation helpful? Give feedback.
-
功能架构
java-agent作为子节点agent的方式,通过Instrument注入的方式,嵌入到用户的Java进程中,通过拦截具体框架的执行逻辑,对接到北极星SDK,实现服务治理能力。
需要支持的功能包括北极星原生的服务治理能力:服务注册/反注册/心跳、服务发现&负载均衡,动态路由、故障熔断、服务限流
以及高级的场景化能力:无损下线、调用链跟踪、全链路灰度等
运行方式
需要支持2种使用模式:
在 JVM 启动的时候加载,通过 javaagent 启动参数 java -javaagent:polaris-java-agent.jar 的方式启动
public static void premain(String agentArgument, Instrumentation instrumentation) throws Exception
在 JVM 启动后 Attach,通过 Attach API 进行加载启动
public static void agentmain(String agentArgument, Instrumentation instrumentation) throws Exception
代码架构
polaris-agent-core:核心层,包含polaris-agent的核心功能实现,对接polaris-java sdk,通过插件方式,拦截并适配各个不同框架的接口,实现服务治理相关的功能
核心层不与具体的字节码注入框架绑定,纯粹是拦截器相关的实现
polaris-agent-adapter:适配层,提供字节码注入的bootstrap相关能力,实现各个不同的字节码注入框架的拦截器接口,适配对接core层的插件
Beta Was this translation helpful? Give feedback.
All reactions