Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-19878 Restrict possibility to create two caches with the same schemas and index names #10817

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
Expand All @@ -90,6 +91,7 @@
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT;
import static org.apache.ignite.internal.processors.cache.GridLocalConfigManager.validateIncomingConfiguration;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;

/**
Expand Down Expand Up @@ -1086,33 +1088,41 @@ private boolean processStartNewCacheRequest(
);
}

if (err == null) {
String conflictErr = checkCacheConflict(req.startCacheConfiguration());
if (!validateStartNewCache(err, persistedCfgs, res, req))
return false;

if (conflictErr != null) {
U.warn(log, "Ignore cache start request. " + conflictErr);
err = QueryUtils.checkQueryEntityConflicts(req.startCacheConfiguration(), registeredCaches.values());

err = new IgniteCheckedException("Failed to start cache. " + conflictErr);
}
if (!validateStartNewCache(err, persistedCfgs, res, req))
return false;

String conflictErr = checkCacheConflict(req.startCacheConfiguration(), false);

if (conflictErr != null) {
U.warn(log, "Ignore cache start request. " + conflictErr);

err = new IgniteCheckedException("Failed to start cache. " + conflictErr);
}

if (err == null)
err = QueryUtils.checkQueryEntityConflicts(req.startCacheConfiguration(), registeredCaches.values());
if (!validateStartNewCache(err, persistedCfgs, res, req))
return false;

if (err == null) {
GridEncryptionManager encMgr = ctx.encryption();
GridEncryptionManager encMgr = ctx.encryption();

if (ccfg.isEncryptionEnabled()) {
if (encMgr.isMasterKeyChangeInProgress())
err = new IgniteCheckedException("Cache start failed. Master key change is in progress.");
else if (encMgr.masterKeyDigest() != null &&
!Arrays.equals(encMgr.masterKeyDigest(), req.masterKeyDigest())) {
err = new IgniteCheckedException("Cache start failed. The request was initiated before " +
"the master key change and can't be processed.");
}
if (ccfg.isEncryptionEnabled()) {
if (encMgr.isMasterKeyChangeInProgress())
err = new IgniteCheckedException("Cache start failed. Master key change is in progress.");
else if (encMgr.masterKeyDigest() != null &&
!Arrays.equals(encMgr.masterKeyDigest(), req.masterKeyDigest())) {
err = new IgniteCheckedException("Cache start failed. The request was initiated before " +
"the master key change and can't be processed.");
}

if (err != null)
U.warn(log, "Ignore cache start request during the master key change process.", err);
if (err != null) {
U.warn(log, "Ignore cache start request during the master key change process.", err);

if (!validateStartNewCache(err, persistedCfgs, res, req))
zstan marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
}

Expand All @@ -1126,14 +1136,8 @@ else if (encMgr.masterKeyDigest() != null &&
}
}

if (err != null) {
if (persistedCfgs)
res.errs.add(err);
else
ctx.cache().completeCacheStartFuture(req, false, err);

if (!validateStartNewCache(err, persistedCfgs, res, req))
return false;
}

assert req.cacheType() != null : req;
assert F.eq(ccfg.getName(), cacheName) : req;
Expand Down Expand Up @@ -1191,6 +1195,34 @@ else if (encMgr.masterKeyDigest() != null &&
return true;
}

/**
* Validate correcteness of new cache start request.
*
* @param err Current error.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @param res Accumulator for cache change process results.
* @param req Cache change request.
*
* @return {@code True} if there is no errors due initialization.
*/
private boolean validateStartNewCache(
@Nullable IgniteCheckedException err,
boolean persistedCfgs,
CacheChangeProcessResult res,
DynamicCacheChangeRequest req
) {
if (err != null) {
if (persistedCfgs)
res.errs.add(err);
else
ctx.cache().completeCacheStartFuture(req, false, err);

return false;
}

return true;
}

/**
* @param dataBag Discovery data bag.
*/
Expand Down Expand Up @@ -1514,7 +1546,7 @@ public void validateNoNewCachesWithNewFormat(CacheNodeCommonDiscoveryData cluste
if (joinDiscoData != null) {
for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) {
if (!registeredCaches.containsKey(e.getKey())) {
conflictErr = checkCacheConflict(e.getValue().cacheData().config());
conflictErr = checkCacheConflict(e.getValue().cacheData().config(), true);

if (conflictErr != null) {
conflictErr = "Failed to start configured cache due to conflict with started caches. " +
Expand Down Expand Up @@ -2007,7 +2039,7 @@ public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();

if (!registeredCaches.containsKey(cfg.getName())) {
String conflictErr = checkCacheConflict(cfg);
String conflictErr = checkCacheConflict(cfg, true);

if (conflictErr != null) {
U.warn(log, "Ignore cache received from joining node. " + conflictErr);
Expand Down Expand Up @@ -2078,9 +2110,20 @@ private void processClientReconnectData(CacheClientReconnectDiscoveryData client
* Checks cache configuration on conflict with already registered caches and cache groups.
*
* @param cfg Cache configuration.
* @param checkIndexes If {@code true} check indexes conflicts.
* @return {@code null} if validation passed, error message in other case.
*/
private String checkCacheConflict(CacheConfiguration<?, ?> cfg) {
private String checkCacheConflict(CacheConfiguration<?, ?> cfg, boolean checkIndexes) {
if (checkIndexes) {
Collection<CacheConfiguration<?, ?>> processedCfgs = F.viewReadOnly(registeredCaches.values(),
(C1<DynamicCacheDescriptor, CacheConfiguration<?, ?>>)DynamicCacheDescriptor::cacheConfiguration);

String err = validateIncomingConfiguration(processedCfgs, cfg);

if (err != null)
return err;
}

int cacheId = CU.cacheId(cfg.getName());

if (cacheGroupByName(cfg.getName()) != null)
Expand Down Expand Up @@ -2143,7 +2186,7 @@ private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID node
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();

if (!registeredCaches.containsKey(cfg.getName())) {
String conflictErr = checkCacheConflict(cfg);
String conflictErr = checkCacheConflict(cfg, true);

if (conflictErr != null) {
if (locJoin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -48,6 +49,8 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
Expand All @@ -72,6 +75,8 @@
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
import static org.apache.ignite.internal.processors.query.QueryUtils.normalizeObjectName;
import static org.apache.ignite.internal.processors.query.QueryUtils.normalizeSchemaName;

/**
* Responsible for restoring local cache configurations (both from static configuration and persistence).
Expand Down Expand Up @@ -684,6 +689,16 @@ private void addCacheFromConfiguration(

CU.validateCacheName(cacheName);

Collection<CacheConfiguration<?, ?>> ccfgs = new ArrayList<>(caches.size());

for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : caches.values())
ccfgs.add(cacheInfo.cacheData().config());

String err = validateIncomingConfiguration(ccfgs, cfg);

if (err != null)
throw new IgniteException(err);

cacheProcessor.cloneCheckSerializable(cfg);

CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
Expand Down Expand Up @@ -719,6 +734,63 @@ private void addCacheFromConfiguration(
}
}

/**
* Validates already processed cache configuration instead a newly defined.
*
* @param cacheConfigs Already processed caches.
* @param cfg Currently processed cache config.
* @return Error message, if supplied configuration is incorrect.
* @throws IgniteException If misconfigured.
*/
@Nullable public static String validateIncomingConfiguration(
Collection<CacheConfiguration<?, ?>> cacheConfigs,
CacheConfiguration<?, ?> cfg
) {
Map<String, String> idxNamesPerCache = new HashMap<>();

String schemaName = normalizeSchemaName(cfg.getName(), cfg.getSqlSchema());

for (CacheConfiguration<?, ?> conf0 : cacheConfigs) {
Collection<QueryEntity> entrs = conf0.getQueryEntities();
String cacheName = conf0.getName();
String cacheSchemaName = normalizeSchemaName(conf0.getName(), conf0.getSqlSchema());

if (!Objects.equals(cacheSchemaName, schemaName) || CU.isSystemCache(cacheName) ||
(Objects.equals(cacheSchemaName, schemaName) && Objects.equals(cfg.getName(), cacheName)))
continue;

for (QueryEntity ent : entrs) {
Collection<QueryIndex> idxs = ent.getIndexes();

for (QueryIndex idx : idxs)
idxNamesPerCache.put(idx.getName(), cacheName);
}
}

if (idxNamesPerCache.isEmpty())
return null;

Collection<QueryEntity> entrs = cfg.getQueryEntities();

for (QueryEntity ent : entrs) {
Collection<QueryIndex> idxs = ent.getIndexes();

for (QueryIndex idx : idxs) {
String normalizedIdxName = normalizeObjectName(idx.getName(), false);

String cacheName = idxNamesPerCache.get(normalizedIdxName);

if (cacheName != null) {
return "Duplicate index name for [cache=" + cfg.getName() +
", idxName=" + idx.getName() + "], an equal index name is already configured for [cache=" +
cacheName + ']';
}
}
}

return null;
}

/**
* Validates cache configuration against stored cache configuration when persistence is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ public static <C extends Collection<T>, T> void assertNotContains(@Nullable Igni
* @param log Logger (optional).
* @param run Runnable.
* @param cls Exception class.
* @param msg Exception message (optional). If provided exception message
* and this message should be equal.
* @param msg Exception message (optional). Check that raised exception message contains this substring.
* @return Thrown throwable.
*/
public static Throwable assertThrows(
Expand Down
Loading