Skip to content

Commit

Permalink
Improve the performance of CitusHasBeenLoaded function for a database…
Browse files Browse the repository at this point in the history
… that does not do CREATE EXTENSION citus but load citus.so. (#7123)

For a database that does not create the citus extension by running

`  CREATE EXTENSION citus;`

`CitusHasBeenLoaded ` function ends up querying the `pg_extension` table
every time it is invoked. This is not an ideal situation for a such a
database.

The idea in this PR is as follows:

### A new field in MetadataCache.
 Add a new variable `extensionCreatedState `of the following type:

```
typedef enum ExtensionCreatedState
{
        UNKNOWN = 0,
        CREATED = 1,
        NOTCREATED = 2,
} ExtensionCreatedState;
```
When the MetadataCache is invalidated, `ExtensionCreatedState` will be
set to UNKNOWN.
     
### Invalidate MetadataCache when CREATE/DROP/ALTER EXTENSION citus
commands are run.

- Register a callback function, named
`InvalidateDistRelationCacheCallback`, for relcache invalidation during
the shared library initialization for `citus.so`. This callback function
is invoked in all the backends whenever the relcache is invalidated in
one of the backends. (This could be caused many DDLs operations).

- In the cache invalidation callback,`
InvalidateDistRelationCacheCallback`, invalidate `MetadataCache` zeroing
it out.
 
- In `CitusHasBeenLoaded`, perform the costly citus is loaded check only
if the `MetadataCache` is not valid.
 
### Downsides

Any relcache invalidation (caused by various DDL operations) will case
Citus MetadataCache to get invalidated. Most of the time it will be
unnecessary. But we rely on that DDL operations on relations will not be
too frequent.
  • Loading branch information
emelsimsek authored Sep 5, 2023
1 parent 1d540b6 commit a849570
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 60 deletions.
23 changes: 14 additions & 9 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"

Expand Down Expand Up @@ -193,6 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,

bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt(
parsetree);

if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
Expand All @@ -207,6 +209,18 @@ multi_ProcessUtility(PlannedStmt *pstmt,
PreprocessCreateExtensionStmtForCitusColumnar(parsetree);
}

if (isCreateAlterExtensionUpdateCitusStmt || IsDropCitusExtensionStmt(parsetree))
{
/*
* Citus maintains a higher level cache. We use the cache invalidation mechanism
* of Postgres to achieve cache coherency between backends. Any change to citus
* extension should be made known to other backends. We do this by invalidating the
* relcache and therefore invoking the citus registered callback that invalidates
* the citus cache in other backends.
*/
CacheInvalidateRelcacheAll();
}

/*
* Make sure that on DROP DATABASE we terminate the background daemon
* associated with it.
Expand Down Expand Up @@ -926,15 +940,6 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
}
}
}

if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt))
{
/*
* Ensure value is valid, we can't do some checks during CREATE
* EXTENSION. This is important to register some invalidation callbacks.
*/
CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */
}
}


Expand Down
120 changes: 69 additions & 51 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,27 @@ typedef struct ShardIdCacheEntry
int shardIndex;
} ShardIdCacheEntry;

/*
* ExtensionCreatedState is used to track if citus extension has been created
* using CREATE EXTENSION command.
* UNKNOWN : MetadataCache is invalid. State is UNKNOWN.
* CREATED : Citus is created.
* NOTCREATED : Citus is not created.
*/
typedef enum ExtensionCreatedState
{
UNKNOWN = 0,
CREATED = 1,
NOTCREATED = 2,
} ExtensionCreatedState;

/*
* State which should be cleared upon DROP EXTENSION. When the configuration
* changes, e.g. because extension is dropped, these summarily get set to 0.
*/
typedef struct MetadataCacheData
{
bool extensionLoaded;
ExtensionCreatedState extensionCreatedState;
Oid distShardRelationId;
Oid distPlacementRelationId;
Oid distBackgroundJobRelationId;
Expand Down Expand Up @@ -288,7 +301,6 @@ static void CreateDistTableCache(void);
static void CreateShardIdCache(void);
static void CreateDistObjectCache(void);
static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateConnParamsCacheCallback(Datum argument, Oid relationId);
Expand Down Expand Up @@ -2187,16 +2199,30 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
bool
CitusHasBeenLoaded(void)
{
if (!MetadataCache.extensionLoaded || creating_extension)
/*
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
* since the objects used by the C code might be not be there yet.
*/
if (creating_extension)
{
/*
* Refresh if we have not determined whether the extension has been
* loaded yet, or in case of ALTER EXTENSION since we want to treat
* Citus as "not loaded" during ALTER EXTENSION citus.
*/
bool extensionLoaded = CitusHasBeenLoadedInternal();
Oid citusExtensionOid = get_extension_oid("citus", true);

if (CurrentExtensionObject == citusExtensionOid)
{
return false;
}
}

if (extensionLoaded && !MetadataCache.extensionLoaded)
/*
* If extensionCreatedState is UNKNOWN, query pg_extension for Citus
* and cache the result. Otherwise return the value extensionCreatedState
* indicates.
*/
if (MetadataCache.extensionCreatedState == UNKNOWN)
{
bool extensionCreated = CitusHasBeenLoadedInternal();

if (extensionCreated)
{
/*
* Loaded Citus for the first time in this session, or first time after
Expand All @@ -2208,31 +2234,22 @@ CitusHasBeenLoaded(void)
*/
StartupCitusBackend();

/*
* InvalidateDistRelationCacheCallback resets state such as extensionLoaded
* when it notices changes to pg_dist_partition (which usually indicate
* `DROP EXTENSION citus;` has been run)
*
* Ensure InvalidateDistRelationCacheCallback will notice those changes
* by caching pg_dist_partition's oid.
*
* We skip these checks during upgrade since pg_dist_partition is not
* present during early stages of upgrade operation.
*/
DistPartitionRelationId();

/*
* This needs to be initialized so we can receive foreign relation graph
* invalidation messages in InvalidateForeignRelationGraphCacheCallback().
* See the comments of InvalidateForeignKeyGraph for more context.
*/
DistColocationRelationId();
}

MetadataCache.extensionLoaded = extensionLoaded;
MetadataCache.extensionCreatedState = CREATED;
}
else
{
MetadataCache.extensionCreatedState = NOTCREATED;
}
}

return MetadataCache.extensionLoaded;
return (MetadataCache.extensionCreatedState == CREATED) ? true : false;
}


Expand All @@ -2257,15 +2274,6 @@ CitusHasBeenLoadedInternal(void)
return false;
}

if (creating_extension && CurrentExtensionObject == citusExtensionOid)
{
/*
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
* since the objects used by the C code might be not be there yet.
*/
return false;
}

/* citus extension exists and has been created */
return true;
}
Expand Down Expand Up @@ -4201,10 +4209,6 @@ InitializeDistCache(void)
CreateShardIdCache();

InitializeDistObjectCache();

/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0);
}


Expand Down Expand Up @@ -4754,20 +4758,26 @@ InvalidateForeignKeyGraph(void)
* InvalidateDistRelationCacheCallback flushes cache entries when a relation
* is updated (or flushes the entire cache).
*/
static void
void
InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
{
/* invalidate either entire cache or a specific entry */
if (relationId == InvalidOid)
{
InvalidateDistTableCache();
InvalidateDistObjectCache();
InvalidateMetadataSystemCache();
}
else
{
void *hashKey = (void *) &relationId;
bool foundInCache = false;

if (DistTableCacheHash == NULL)
{
return;
}

CitusTableCacheEntrySlot *cacheSlot =
hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache);
if (foundInCache)
Expand All @@ -4776,21 +4786,19 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
}

/*
* If pg_dist_partition is being invalidated drop all state
* This happens pretty rarely, but most importantly happens during
* DROP EXTENSION citus; This isn't the only time when this happens
* though, it can happen for multiple other reasons, such as an
* autovacuum running ANALYZE on pg_dist_partition. Such an ANALYZE
* wouldn't really need a full Metadata cache invalidation, but we
* don't know how to differentiate between DROP EXTENSION and ANALYZE.
* So for now we simply drop it in both cases and take the slight
* temporary performance hit.
* if pg_dist_partition relcache is invalidated for some reason,
* invalidate the MetadataCache. It is likely an overkill to invalidate
* the entire cache here. But until a better fix, we keep it this way
* for postgres regression tests that includes
* REINDEX SCHEMA CONCURRENTLY pg_catalog
* command.
*/
if (relationId == MetadataCache.distPartitionRelationId)
{
InvalidateMetadataSystemCache();
}


if (relationId == MetadataCache.distObjectRelationId)
{
InvalidateDistObjectCache();
Expand Down Expand Up @@ -4830,6 +4838,11 @@ InvalidateDistTableCache(void)
CitusTableCacheEntrySlot *cacheSlot = NULL;
HASH_SEQ_STATUS status;

if (DistTableCacheHash == NULL)
{
return;
}

hash_seq_init(&status, DistTableCacheHash);

while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
Expand All @@ -4848,6 +4861,11 @@ InvalidateDistObjectCache(void)
DistObjectCacheEntry *cacheEntry = NULL;
HASH_SEQ_STATUS status;

if (DistObjectCacheHash == NULL)
{
return;
}

hash_seq_init(&status, DistObjectCacheHash);

while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL)
Expand Down Expand Up @@ -4930,8 +4948,8 @@ CreateDistObjectCache(void)


/*
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
* and invalidates the worker node, ConnParams, and local group ID caches.
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionCreatedState
* flag and invalidates the worker node, ConnParams, and local group ID caches.
*/
void
InvalidateMetadataSystemCache(void)
Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/varlena.h"

Expand Down Expand Up @@ -554,6 +556,9 @@ _PG_init(void)
"ColumnarSupportsIndexAM",
true, &handle);

CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0);

INIT_COLUMNAR_SYMBOL(CompressionTypeStr_type, CompressionTypeStr);
INIT_COLUMNAR_SYMBOL(IsColumnarTableAmTable_type, IsColumnarTableAmTable);
INIT_COLUMNAR_SYMBOL(ReadColumnarOptions_type, ReadColumnarOptions);
Expand Down
2 changes: 2 additions & 0 deletions src/include/distributed/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ typedef enum
ANY_CITUS_TABLE_TYPE
} CitusTableType;

void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);

extern List * AllCitusTableIds(void);
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry);
Expand Down
44 changes: 44 additions & 0 deletions src/test/regress/citus_tests/test/test_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import psycopg
import pytest


def test_create_drop_citus(coord):
with coord.cur() as cur1:
with coord.cur() as cur2:
# Conn1 drops the extension
# and Conn2 cannot use it.
cur1.execute("DROP EXTENSION citus")

with pytest.raises(psycopg.errors.UndefinedFunction):
# Conn1 dropped the extension. citus_version udf
# cannot be found.sycopg.errors.UndefinedFunction
# is expected here.
cur2.execute("SELECT citus_version();")

# Conn2 creates the extension,
# Conn1 is able to use it immediadtely.
cur2.execute("CREATE EXTENSION citus")
cur1.execute("SELECT citus_version();")
cur1.execute("DROP EXTENSION citus;")

with coord.cur() as cur1:
with coord.cur() as cur2:
# A connection is able to create and use the extension
# within a transaction block.
cur1.execute("BEGIN;")
cur1.execute("CREATE TABLE t1(id int);")
cur1.execute("CREATE EXTENSION citus;")
cur1.execute("SELECT create_reference_table('t1')")
cur1.execute("ABORT;")

# Conn1 aborted so Conn2 is be able to create and
# use the extension within a transaction block.
cur2.execute("BEGIN;")
cur2.execute("CREATE TABLE t1(id int);")
cur2.execute("CREATE EXTENSION citus;")
cur2.execute("SELECT create_reference_table('t1')")
cur2.execute("COMMIT;")

# Conn2 commited so Conn1 is be able to use the
# extension immediately.
cur1.execute("SELECT citus_version();")

0 comments on commit a849570

Please sign in to comment.