Skip to content

Commit

Permalink
IGNITE-23202 Fixed server node fail when node service filter class un…
Browse files Browse the repository at this point in the history
…known (#11539)

Co-authored-by: Nikita Amelchev <[email protected]>
  • Loading branch information
nizhikov and NSAmelchev authored Oct 7, 2024
1 parent 61d3c20 commit cf98ead
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,16 @@ private void cancelDeployedServices() {

ServiceProcessorCommonDiscoveryData clusterData = (ServiceProcessorCommonDiscoveryData)data.commonData();

for (ServiceInfo desc : clusterData.registeredServices())
for (ServiceInfo desc : clusterData.registeredServices()) {
try {
unmarshalNodeFilterIfNeeded(desc.configuration());
}
catch (IgniteCheckedException e) {
throw new IgniteException("Cannot join the cluster.", e);
}

registerService(desc);
}
}

/** {@inheritDoc} */
Expand All @@ -413,15 +421,27 @@ private void cancelDeployedServices() {
ClusterNode node,
DiscoveryDataBag.JoiningNodeDiscoveryData data
) {
if (data.joiningNodeData() == null || !ctx.security().enabled())
if (data.joiningNodeData() == null)
return null;

List<ServiceInfo> svcs = ((ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData()).services();

SecurityException err = checkDeployPermissionDuringJoin(node, svcs);
if (ctx.security().enabled()) {
SecurityException err = checkDeployPermissionDuringJoin(node, svcs);

if (err != null)
return new IgniteNodeValidationResult(node.id(), err.getMessage());
if (err != null)
return new IgniteNodeValidationResult(node.id(), err.getMessage());
}

for (ServiceInfo svc : svcs) {
try {
unmarshalNodeFilterIfNeeded(svc.configuration());
}
catch (IgniteCheckedException e) {
return new IgniteNodeValidationResult(node.id(), "Node join is rejected [joiningNodeId=" + node.id() +
", msg=" + e.getMessage() + ']');
}
}

return null;
}
Expand Down Expand Up @@ -654,7 +674,7 @@ public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service s
*/
private PreparedConfigurations<IgniteUuid> prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs,
IgnitePredicate<ClusterNode> dfltNodeFilter) {
List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size());
List<LazyServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size());

List<GridServiceDeploymentFuture<IgniteUuid>> failedFuts = null;

Expand All @@ -679,12 +699,14 @@ private PreparedConfigurations<IgniteUuid> prepareServiceConfigurations(Collecti
if (err == null) {
try {
byte[] srvcBytes = U.marshal(marsh, cfg.getService());
byte[] nodeFilterBytes = U.marshal(marsh, cfg.getNodeFilter());
byte[] interceptorsBytes = U.marshal(marsh, cfg.getInterceptors());

String[] knownSvcMdtNames = cfg instanceof PlatformServiceConfiguration ?
((PlatformServiceConfiguration)cfg).mtdNames() : null;

cfgsCp.add(new LazyServiceConfiguration(cfg, srvcBytes, interceptorsBytes).platformMtdNames(knownSvcMdtNames));
cfgsCp.add(new LazyServiceConfiguration(cfg, srvcBytes, nodeFilterBytes, interceptorsBytes)
.platformMtdNames(knownSvcMdtNames));
}
catch (Exception e) {
U.error(log, "Failed to marshal service with configured marshaller " +
Expand Down Expand Up @@ -771,7 +793,7 @@ private IgniteInternalFuture<?> deployAll(@NotNull Collection<ServiceConfigurati

PreparedConfigurations<IgniteUuid> srvcCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter);

List<ServiceConfiguration> cfgsCp = srvcCfg.cfgs;
List<LazyServiceConfiguration> cfgsCp = srvcCfg.cfgs;

List<GridServiceDeploymentFuture<IgniteUuid>> failedFuts = srvcCfg.failedFuts;

Expand All @@ -781,7 +803,7 @@ private IgniteInternalFuture<?> deployAll(@NotNull Collection<ServiceConfigurati
try {
Collection<ServiceChangeAbstractRequest> reqs = new ArrayList<>();

for (ServiceConfiguration cfg : cfgsCp) {
for (LazyServiceConfiguration cfg : cfgsCp) {
IgniteUuid srvcId = IgniteUuid.randomUuid();

GridServiceDeploymentFuture<IgniteUuid> fut = new GridServiceDeploymentFuture<>(cfg, srvcId);
Expand Down Expand Up @@ -1421,6 +1443,23 @@ private Service copyAndInject(ServiceConfiguration cfg, ServiceContextImpl svcCt
}
}

/** @param cfg Lazy service configuration. */
private void unmarshalNodeFilterIfNeeded(LazyServiceConfiguration cfg) throws IgniteCheckedException {
if (cfg.getNodeFilter() != null)
return;

GridDeployment dep = ctx.deploy().getDeployment(cfg.serviceClassName());

ClassLoader clsLdr = U.resolveClassLoader(dep != null ? dep.classLoader() : null, ctx.config());

try {
cfg.setNodeFilter(U.unmarshal(marsh, cfg.nodeFilterBytes(), clsLdr));
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to unmarshal class of service node filter [cfg=" + cfg + ']', e);
}
}

/**
* @param ctxs Contexts to cancel.
* @param cancelCnt Number of contexts to cancel.
Expand Down Expand Up @@ -1712,7 +1751,7 @@ public ServiceDeploymentManager deployment() {
}
}

for (ServiceConfiguration srvcCfg : prepCfgs.cfgs) {
for (LazyServiceConfiguration srvcCfg : prepCfgs.cfgs) {
ServiceInfo srvcInfo = new ServiceInfo(ctx.localNodeId(), IgniteUuid.randomUuid(), srvcCfg, true);

srvcInfo.context(ctx);
Expand Down Expand Up @@ -1764,11 +1803,20 @@ else if (req instanceof ServiceUndeploymentRequest)
"exists : [" + "srvcId" + reqSrvcId + ", srvcTop=" + oldDesc.topologySnapshot() + ']');
}
else {
ServiceConfiguration cfg = ((ServiceDeploymentRequest)req).configuration();
LazyServiceConfiguration cfg = ((ServiceDeploymentRequest)req).configuration();

if (ctx.security().enabled())
err = checkPermissions(((ServiceDeploymentRequest)req).configuration().getName(), SERVICE_DEPLOY);

if (err == null) {
try {
unmarshalNodeFilterIfNeeded(cfg);
}
catch (IgniteCheckedException e) {
err = new IgniteCheckedException("Failed to deploy service.", e);
}
}

if (err == null) {
oldDesc = lookupInRegisteredServices(cfg.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.ignite.internal.processors.service;

import java.util.Arrays;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceCallInterceptor;
import org.apache.ignite.services.ServiceConfiguration;
import org.jetbrains.annotations.Nullable;

/**
* Lazy service configuration.
Expand All @@ -36,6 +37,10 @@ public class LazyServiceConfiguration extends ServiceConfiguration {
@GridToStringExclude
private transient Service srvc;

/** Node filter. */
@GridToStringExclude
private transient IgnitePredicate<ClusterNode> nodeFilter;

/** Service interceptors. */
@GridToStringExclude
private transient ServiceCallInterceptor[] interceptors;
Expand All @@ -46,6 +51,9 @@ public class LazyServiceConfiguration extends ServiceConfiguration {
/** */
private byte[] srvcBytes;

/** */
private byte[] nodeFilterBytes;

/** */
private byte[] interceptorsBytes;

Expand All @@ -65,7 +73,12 @@ public LazyServiceConfiguration() {
* @param srvcBytes Marshalled service.
* @param interceptorsBytes Marshalled interceptors.
*/
public LazyServiceConfiguration(ServiceConfiguration cfg, byte[] srvcBytes, @Nullable byte[] interceptorsBytes) {
public LazyServiceConfiguration(
ServiceConfiguration cfg,
byte[] srvcBytes,
byte[] nodeFilterBytes,
byte[] interceptorsBytes
) {
assert cfg.getService() != null : cfg;
assert srvcBytes != null;

Expand All @@ -75,6 +88,7 @@ public LazyServiceConfiguration(ServiceConfiguration cfg, byte[] srvcBytes, @Nul
cacheName = cfg.getCacheName();
affKey = cfg.getAffinityKey();
nodeFilter = cfg.getNodeFilter();
this.nodeFilterBytes = nodeFilterBytes;
this.srvcBytes = srvcBytes;
srvc = cfg.getService();
srvcClsName = srvc.getClass().getName();
Expand All @@ -83,6 +97,13 @@ public LazyServiceConfiguration(ServiceConfiguration cfg, byte[] srvcBytes, @Nul
this.interceptorsBytes = interceptorsBytes;
}

/**
* @return Node filter bytes.
*/
public byte[] nodeFilterBytes() {
return nodeFilterBytes;
}

/**
* @return Service bytes.
*/
Expand All @@ -104,6 +125,18 @@ public String serviceClassName() {
return srvc;
}

/** {@inheritDoc} */
@Override public ServiceConfiguration setNodeFilter(IgnitePredicate<ClusterNode> nodeFilter) {
this.nodeFilter = nodeFilter;

return this;
}

/** {@inheritDoc} */
@Override public IgnitePredicate<ClusterNode> getNodeFilter() {
return nodeFilter;
}

/** {@inheritDoc} */
@Override public ServiceCallInterceptor[] getInterceptors() {
return interceptors;
Expand All @@ -116,6 +149,19 @@ public byte[] interceptorBytes() {
return interceptorsBytes;
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (!(o instanceof LazyServiceConfiguration))
return super.equals(o);

if (!equalsIgnoreNodeFilter(o))
return false;

LazyServiceConfiguration that = (LazyServiceConfiguration)o;

return Arrays.equals(nodeFilterBytes, that.nodeFilterBytes);
}

/** {@inheritDoc} */
@SuppressWarnings("RedundantIfStatement")
@Override public boolean equalsIgnoreNodeFilter(Object o) {
Expand Down Expand Up @@ -168,6 +214,7 @@ String[] platformMtdNames() {
String svcCls = srvc == null ? "" : srvc.getClass().getSimpleName();
String nodeFilterCls = nodeFilter == null ? "" : nodeFilter.getClass().getSimpleName();

return S.toString(LazyServiceConfiguration.class, this, "svcCls", svcCls, "nodeFilterCls", nodeFilterCls);
return S.toString(LazyServiceConfiguration.class, this, "name", name, "svcCls", svcCls,
"nodeFilterCls", nodeFilterCls);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import java.io.Serializable;
import java.util.List;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.services.ServiceConfiguration;

/**
* Result of services validation before deployment.
*/
class PreparedConfigurations<T extends Serializable> {
/** */
final List<ServiceConfiguration> cfgs;
final List<LazyServiceConfiguration> cfgs;

/** */
final List<GridServiceDeploymentFuture<T>> failedFuts;
Expand All @@ -36,7 +35,7 @@ class PreparedConfigurations<T extends Serializable> {
* @param cfgs Configurations to deploy.
* @param failedFuts Finished futures for failed configurations.
*/
PreparedConfigurations(List<ServiceConfiguration> cfgs, List<GridServiceDeploymentFuture<T>> failedFuts) {
PreparedConfigurations(List<LazyServiceConfiguration> cfgs, List<GridServiceDeploymentFuture<T>> failedFuts) {
this.cfgs = cfgs;
this.failedFuts = failedFuts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.services.ServiceConfiguration;
import org.jetbrains.annotations.NotNull;

/**
Expand All @@ -30,13 +29,13 @@ public class ServiceDeploymentRequest extends ServiceChangeAbstractRequest {
private static final long serialVersionUID = 0L;

/** Service configuration. */
private final ServiceConfiguration cfg;
private final LazyServiceConfiguration cfg;

/**
* @param srvcId Service id.
* @param cfg Service configuration.
*/
public ServiceDeploymentRequest(@NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg) {
public ServiceDeploymentRequest(@NotNull IgniteUuid srvcId, @NotNull LazyServiceConfiguration cfg) {
super(srvcId);

this.cfg = cfg;
Expand All @@ -45,7 +44,7 @@ public ServiceDeploymentRequest(@NotNull IgniteUuid srvcId, @NotNull ServiceConf
/**
* @return Service configuration.
*/
public ServiceConfiguration configuration() {
public LazyServiceConfiguration configuration() {
return cfg;
}

Expand Down
Loading

0 comments on commit cf98ead

Please sign in to comment.