diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 888b3dfed9f..1945218b687 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -77,6 +77,7 @@ #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -193,6 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt( parsetree); + if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt) { ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); @@ -207,6 +209,18 @@ multi_ProcessUtility(PlannedStmt *pstmt, PreprocessCreateExtensionStmtForCitusColumnar(parsetree); } + if (isCreateAlterExtensionUpdateCitusStmt || IsDropCitusExtensionStmt(parsetree)) + { + /* + * Citus maintains a higher level cache. We use the cache invalidation mechanism + * of Postgres to achieve cache coherency between backends. Any change to citus + * extension should be made known to other backends. We do this by invalidating the + * relcache and therefore invoking the citus registered callback that invalidates + * the citus cache in other backends. + */ + CacheInvalidateRelcacheAll(); + } + /* * Make sure that on DROP DATABASE we terminate the background daemon * associated with it. @@ -926,15 +940,6 @@ ProcessUtilityInternal(PlannedStmt *pstmt, } } } - - if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt)) - { - /* - * Ensure value is valid, we can't do some checks during CREATE - * EXTENSION. This is important to register some invalidation callbacks. - */ - CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */ - } } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 5ccd4a512a8..55d0f11c57f 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -133,6 +133,19 @@ typedef struct ShardIdCacheEntry int shardIndex; } ShardIdCacheEntry; +/* + * ExtensionCreatedState is used to track if citus extension has been created + * using CREATE EXTENSION command. + * UNKNOWN : MetadataCache is invalid. State is UNKNOWN. + * CREATED : Citus is created. + * NOTCREATED : Citus is not created. + */ +typedef enum ExtensionCreatedState +{ + UNKNOWN = 0, + CREATED = 1, + NOTCREATED = 2, +} ExtensionCreatedState; /* * State which should be cleared upon DROP EXTENSION. When the configuration @@ -140,7 +153,7 @@ typedef struct ShardIdCacheEntry */ typedef struct MetadataCacheData { - bool extensionLoaded; + ExtensionCreatedState extensionCreatedState; Oid distShardRelationId; Oid distPlacementRelationId; Oid distBackgroundJobRelationId; @@ -288,7 +301,6 @@ static void CreateDistTableCache(void); static void CreateShardIdCache(void); static void CreateDistObjectCache(void); static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId); -static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateConnParamsCacheCallback(Datum argument, Oid relationId); @@ -2187,16 +2199,30 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray, bool CitusHasBeenLoaded(void) { - if (!MetadataCache.extensionLoaded || creating_extension) + /* + * We do not use Citus hooks during CREATE/ALTER EXTENSION citus + * since the objects used by the C code might be not be there yet. + */ + if (creating_extension) { - /* - * Refresh if we have not determined whether the extension has been - * loaded yet, or in case of ALTER EXTENSION since we want to treat - * Citus as "not loaded" during ALTER EXTENSION citus. - */ - bool extensionLoaded = CitusHasBeenLoadedInternal(); + Oid citusExtensionOid = get_extension_oid("citus", true); + + if (CurrentExtensionObject == citusExtensionOid) + { + return false; + } + } - if (extensionLoaded && !MetadataCache.extensionLoaded) + /* + * If extensionCreatedState is UNKNOWN, query pg_extension for Citus + * and cache the result. Otherwise return the value extensionCreatedState + * indicates. + */ + if (MetadataCache.extensionCreatedState == UNKNOWN) + { + bool extensionCreated = CitusHasBeenLoadedInternal(); + + if (extensionCreated) { /* * Loaded Citus for the first time in this session, or first time after @@ -2208,31 +2234,22 @@ CitusHasBeenLoaded(void) */ StartupCitusBackend(); - /* - * InvalidateDistRelationCacheCallback resets state such as extensionLoaded - * when it notices changes to pg_dist_partition (which usually indicate - * `DROP EXTENSION citus;` has been run) - * - * Ensure InvalidateDistRelationCacheCallback will notice those changes - * by caching pg_dist_partition's oid. - * - * We skip these checks during upgrade since pg_dist_partition is not - * present during early stages of upgrade operation. - */ - DistPartitionRelationId(); - /* * This needs to be initialized so we can receive foreign relation graph * invalidation messages in InvalidateForeignRelationGraphCacheCallback(). * See the comments of InvalidateForeignKeyGraph for more context. */ DistColocationRelationId(); - } - MetadataCache.extensionLoaded = extensionLoaded; + MetadataCache.extensionCreatedState = CREATED; + } + else + { + MetadataCache.extensionCreatedState = NOTCREATED; + } } - return MetadataCache.extensionLoaded; + return (MetadataCache.extensionCreatedState == CREATED) ? true : false; } @@ -2257,15 +2274,6 @@ CitusHasBeenLoadedInternal(void) return false; } - if (creating_extension && CurrentExtensionObject == citusExtensionOid) - { - /* - * We do not use Citus hooks during CREATE/ALTER EXTENSION citus - * since the objects used by the C code might be not be there yet. - */ - return false; - } - /* citus extension exists and has been created */ return true; } @@ -4201,10 +4209,6 @@ InitializeDistCache(void) CreateShardIdCache(); InitializeDistObjectCache(); - - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, - (Datum) 0); } @@ -4754,7 +4758,7 @@ InvalidateForeignKeyGraph(void) * InvalidateDistRelationCacheCallback flushes cache entries when a relation * is updated (or flushes the entire cache). */ -static void +void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) { /* invalidate either entire cache or a specific entry */ @@ -4762,12 +4766,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) { InvalidateDistTableCache(); InvalidateDistObjectCache(); + InvalidateMetadataSystemCache(); } else { void *hashKey = (void *) &relationId; bool foundInCache = false; + if (DistTableCacheHash == NULL) + { + return; + } + CitusTableCacheEntrySlot *cacheSlot = hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache); if (foundInCache) @@ -4776,21 +4786,19 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) } /* - * If pg_dist_partition is being invalidated drop all state - * This happens pretty rarely, but most importantly happens during - * DROP EXTENSION citus; This isn't the only time when this happens - * though, it can happen for multiple other reasons, such as an - * autovacuum running ANALYZE on pg_dist_partition. Such an ANALYZE - * wouldn't really need a full Metadata cache invalidation, but we - * don't know how to differentiate between DROP EXTENSION and ANALYZE. - * So for now we simply drop it in both cases and take the slight - * temporary performance hit. + * if pg_dist_partition relcache is invalidated for some reason, + * invalidate the MetadataCache. It is likely an overkill to invalidate + * the entire cache here. But until a better fix, we keep it this way + * for postgres regression tests that includes + * REINDEX SCHEMA CONCURRENTLY pg_catalog + * command. */ if (relationId == MetadataCache.distPartitionRelationId) { InvalidateMetadataSystemCache(); } + if (relationId == MetadataCache.distObjectRelationId) { InvalidateDistObjectCache(); @@ -4830,6 +4838,11 @@ InvalidateDistTableCache(void) CitusTableCacheEntrySlot *cacheSlot = NULL; HASH_SEQ_STATUS status; + if (DistTableCacheHash == NULL) + { + return; + } + hash_seq_init(&status, DistTableCacheHash); while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL) @@ -4848,6 +4861,11 @@ InvalidateDistObjectCache(void) DistObjectCacheEntry *cacheEntry = NULL; HASH_SEQ_STATUS status; + if (DistObjectCacheHash == NULL) + { + return; + } + hash_seq_init(&status, DistObjectCacheHash); while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL) @@ -4930,8 +4948,8 @@ CreateDistObjectCache(void) /* - * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag, - * and invalidates the worker node, ConnParams, and local group ID caches. + * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionCreatedState + * flag and invalidates the worker node, ConnParams, and local group ID caches. */ void InvalidateMetadataSystemCache(void) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1f4cee0372f..e5d593295a3 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -109,6 +109,8 @@ #include "tcop/tcopprot.h" #include "utils/guc.h" #include "utils/guc_tables.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -554,6 +556,9 @@ _PG_init(void) "ColumnarSupportsIndexAM", true, &handle); + CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, + (Datum) 0); + INIT_COLUMNAR_SYMBOL(CompressionTypeStr_type, CompressionTypeStr); INIT_COLUMNAR_SYMBOL(IsColumnarTableAmTable_type, IsColumnarTableAmTable); INIT_COLUMNAR_SYMBOL(ReadColumnarOptions_type, ReadColumnarOptions); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4e918ecf7eb..34b95b859e4 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -137,6 +137,8 @@ typedef enum ANY_CITUS_TABLE_TYPE } CitusTableType; +void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); + extern List * AllCitusTableIds(void); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry); diff --git a/src/test/regress/citus_tests/test/test_extension.py b/src/test/regress/citus_tests/test/test_extension.py new file mode 100644 index 00000000000..e9b90f1157e --- /dev/null +++ b/src/test/regress/citus_tests/test/test_extension.py @@ -0,0 +1,44 @@ +import psycopg +import pytest + + +def test_create_drop_citus(coord): + with coord.cur() as cur1: + with coord.cur() as cur2: + # Conn1 drops the extension + # and Conn2 cannot use it. + cur1.execute("DROP EXTENSION citus") + + with pytest.raises(psycopg.errors.UndefinedFunction): + # Conn1 dropped the extension. citus_version udf + # cannot be found.sycopg.errors.UndefinedFunction + # is expected here. + cur2.execute("SELECT citus_version();") + + # Conn2 creates the extension, + # Conn1 is able to use it immediadtely. + cur2.execute("CREATE EXTENSION citus") + cur1.execute("SELECT citus_version();") + cur1.execute("DROP EXTENSION citus;") + + with coord.cur() as cur1: + with coord.cur() as cur2: + # A connection is able to create and use the extension + # within a transaction block. + cur1.execute("BEGIN;") + cur1.execute("CREATE TABLE t1(id int);") + cur1.execute("CREATE EXTENSION citus;") + cur1.execute("SELECT create_reference_table('t1')") + cur1.execute("ABORT;") + + # Conn1 aborted so Conn2 is be able to create and + # use the extension within a transaction block. + cur2.execute("BEGIN;") + cur2.execute("CREATE TABLE t1(id int);") + cur2.execute("CREATE EXTENSION citus;") + cur2.execute("SELECT create_reference_table('t1')") + cur2.execute("COMMIT;") + + # Conn2 commited so Conn1 is be able to use the + # extension immediately. + cur1.execute("SELECT citus_version();")