Skip to content

Commit

Permalink
IGNITE-19878 Restrict possibility to create two caches with the same …
Browse files Browse the repository at this point in the history
…schemas and index names - Fixes #10817.

Signed-off-by: zstan <[email protected]>
  • Loading branch information
zstan committed Jul 4, 2023
1 parent fe4d41b commit ab168e7
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
Expand All @@ -90,6 +91,7 @@
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT;
import static org.apache.ignite.internal.processors.cache.GridLocalConfigManager.validateIncomingConfiguration;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;

/**
Expand Down Expand Up @@ -1086,8 +1088,11 @@ private boolean processStartNewCacheRequest(
);
}

if (err == null)
err = QueryUtils.checkQueryEntityConflicts(req.startCacheConfiguration(), registeredCaches.values());

if (err == null) {
String conflictErr = checkCacheConflict(req.startCacheConfiguration());
String conflictErr = checkCacheConflict(req.startCacheConfiguration(), false);

if (conflictErr != null) {
U.warn(log, "Ignore cache start request. " + conflictErr);
Expand All @@ -1096,9 +1101,6 @@ private boolean processStartNewCacheRequest(
}
}

if (err == null)
err = QueryUtils.checkQueryEntityConflicts(req.startCacheConfiguration(), registeredCaches.values());

if (err == null) {
GridEncryptionManager encMgr = ctx.encryption();

Expand Down Expand Up @@ -1191,6 +1193,34 @@ else if (encMgr.masterKeyDigest() != null &&
return true;
}

/**
* Validate correcteness of new cache start request.
*
* @param err Current error.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @param res Accumulator for cache change process results.
* @param req Cache change request.
*
* @return {@code True} if there is no errors due initialization.
*/
private boolean validateStartNewCache(
@Nullable IgniteCheckedException err,
boolean persistedCfgs,
CacheChangeProcessResult res,
DynamicCacheChangeRequest req
) {
if (err != null) {
if (persistedCfgs)
res.errs.add(err);
else
ctx.cache().completeCacheStartFuture(req, false, err);

return false;
}

return true;
}

/**
* @param dataBag Discovery data bag.
*/
Expand Down Expand Up @@ -1514,7 +1544,7 @@ public void validateNoNewCachesWithNewFormat(CacheNodeCommonDiscoveryData cluste
if (joinDiscoData != null) {
for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) {
if (!registeredCaches.containsKey(e.getKey())) {
conflictErr = checkCacheConflict(e.getValue().cacheData().config());
conflictErr = checkCacheConflict(e.getValue().cacheData().config(), true);

if (conflictErr != null) {
conflictErr = "Failed to start configured cache due to conflict with started caches. " +
Expand Down Expand Up @@ -2007,7 +2037,7 @@ public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();

if (!registeredCaches.containsKey(cfg.getName())) {
String conflictErr = checkCacheConflict(cfg);
String conflictErr = checkCacheConflict(cfg, true);

if (conflictErr != null) {
U.warn(log, "Ignore cache received from joining node. " + conflictErr);
Expand Down Expand Up @@ -2078,9 +2108,20 @@ private void processClientReconnectData(CacheClientReconnectDiscoveryData client
* Checks cache configuration on conflict with already registered caches and cache groups.
*
* @param cfg Cache configuration.
* @param checkIndexes If {@code true} check indexes conflicts.
* @return {@code null} if validation passed, error message in other case.
*/
private String checkCacheConflict(CacheConfiguration<?, ?> cfg) {
private String checkCacheConflict(CacheConfiguration<?, ?> cfg, boolean checkIndexes) {
if (checkIndexes) {
Collection<CacheConfiguration<?, ?>> processedCfgs = F.viewReadOnly(registeredCaches.values(),
(C1<DynamicCacheDescriptor, CacheConfiguration<?, ?>>)DynamicCacheDescriptor::cacheConfiguration);

String err = validateIncomingConfiguration(processedCfgs, cfg);

if (err != null)
return err;
}

int cacheId = CU.cacheId(cfg.getName());

if (cacheGroupByName(cfg.getName()) != null)
Expand Down Expand Up @@ -2143,7 +2184,7 @@ private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID node
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();

if (!registeredCaches.containsKey(cfg.getName())) {
String conflictErr = checkCacheConflict(cfg);
String conflictErr = checkCacheConflict(cfg, true);

if (conflictErr != null) {
if (locJoin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -48,6 +49,8 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
Expand All @@ -72,6 +75,8 @@
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
import static org.apache.ignite.internal.processors.query.QueryUtils.normalizeObjectName;
import static org.apache.ignite.internal.processors.query.QueryUtils.normalizeSchemaName;

/**
* Responsible for restoring local cache configurations (both from static configuration and persistence).
Expand Down Expand Up @@ -684,6 +689,16 @@ private void addCacheFromConfiguration(

CU.validateCacheName(cacheName);

Collection<CacheConfiguration<?, ?>> ccfgs = new ArrayList<>(caches.size());

for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : caches.values())
ccfgs.add(cacheInfo.cacheData().config());

String err = validateIncomingConfiguration(ccfgs, cfg);

if (err != null)
throw new IgniteException(err);

cacheProcessor.cloneCheckSerializable(cfg);

CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
Expand Down Expand Up @@ -719,6 +734,63 @@ private void addCacheFromConfiguration(
}
}

/**
* Validates already processed cache configuration instead a newly defined.
*
* @param cacheConfigs Already processed caches.
* @param cfg Currently processed cache config.
* @return Error message, if supplied configuration is incorrect.
* @throws IgniteException If misconfigured.
*/
@Nullable public static String validateIncomingConfiguration(
Collection<CacheConfiguration<?, ?>> cacheConfigs,
CacheConfiguration<?, ?> cfg
) {
Map<String, String> idxNamesPerCache = new HashMap<>();

String schemaName = normalizeSchemaName(cfg.getName(), cfg.getSqlSchema());

for (CacheConfiguration<?, ?> conf0 : cacheConfigs) {
Collection<QueryEntity> entrs = conf0.getQueryEntities();
String cacheName = conf0.getName();
String cacheSchemaName = normalizeSchemaName(conf0.getName(), conf0.getSqlSchema());

if (!Objects.equals(cacheSchemaName, schemaName) || CU.isSystemCache(cacheName) ||
(Objects.equals(cacheSchemaName, schemaName) && Objects.equals(cfg.getName(), cacheName)))
continue;

for (QueryEntity ent : entrs) {
Collection<QueryIndex> idxs = ent.getIndexes();

for (QueryIndex idx : idxs)
idxNamesPerCache.put(idx.getName(), cacheName);
}
}

if (idxNamesPerCache.isEmpty())
return null;

Collection<QueryEntity> entrs = cfg.getQueryEntities();

for (QueryEntity ent : entrs) {
Collection<QueryIndex> idxs = ent.getIndexes();

for (QueryIndex idx : idxs) {
String normalizedIdxName = normalizeObjectName(idx.getName(), false);

String cacheName = idxNamesPerCache.get(normalizedIdxName);

if (cacheName != null) {
return "Duplicate index name for [cache=" + cfg.getName() +
", idxName=" + idx.getName() + "], an equal index name is already configured for [cache=" +
cacheName + ']';
}
}
}

return null;
}

/**
* Validates cache configuration against stored cache configuration when persistence is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ public static <C extends Collection<T>, T> void assertNotContains(@Nullable Igni
* @param log Logger (optional).
* @param run Runnable.
* @param cls Exception class.
* @param msg Exception message (optional). If provided exception message
* and this message should be equal.
* @param msg Exception message (optional). Check that raised exception message contains this substring.
* @return Thrown throwable.
*/
public static Throwable assertThrows(
Expand Down
Loading

0 comments on commit ab168e7

Please sign in to comment.